From 2f4af918ce6c520772b01e6f7633848dbcbe7f9b Mon Sep 17 00:00:00 2001
From: Harendra Kumar <harendra@composewell.com>
Date: Thu, 12 Sep 2024 15:08:29 +0530
Subject: [PATCH] Add incrEwma for incremental computation of EWMA

---
 src/Streamly/Statistics/Scanl.hs | 82 ++++++++++++++++++++++++++++----
 1 file changed, 72 insertions(+), 10 deletions(-)

diff --git a/src/Streamly/Statistics/Scanl.hs b/src/Streamly/Statistics/Scanl.hs
index d2fdefb..acec45b 100644
--- a/src/Streamly/Statistics/Scanl.hs
+++ b/src/Streamly/Statistics/Scanl.hs
@@ -54,6 +54,7 @@ module Streamly.Statistics.Scanl
     -- | Exponential Smoothing.
     , ewma
     , ewmaRampUpSmoothing
+    , incrEwma
 
     -- ** Spread
     -- | Second order central moment is a statistical measure of dispersion.
@@ -90,6 +91,8 @@ module Streamly.Statistics.Scanl
     )
 where
 
+import Control.Exception (assert)
+import Control.Monad (when)
 import Control.Monad.IO.Class (MonadIO(..))
 import Data.Function ((&))
 import Data.Map.Strict (Map)
@@ -416,24 +419,38 @@ incrQuadraticMean = incrPowerMean 2
 -------------------------------------------------------------------------------
 
 -- XXX Is this numerically stable? We can use the kbn summation here.
+-- XXX change the signature to use the newer value first.
 
 -- | ewmaStep smoothing-factor old-value new-value
+--
+-- >>> ewmaStep k x0 x1 = k * x1 + (1 - k) * x0
+--
 {-# INLINE ewmaStep #-}
 ewmaStep :: Double -> Double -> Double -> Double
 ewmaStep k x0 x1 = (1 - k) * x0 + k * x1
 
 -- XXX Compute this in a sliding window?
 
--- | @ewma smoothingFactor@.
+-- | Exponentially weighted moving average is given by @ewma w@ where @w@ is
+-- the smoothing factor varying between 0 and 1. To compute the moving average
+-- the newest input is given a weight of @w@ and the running ewma is given a
+-- weight of @1 - w@.
+--
+-- For an empty stream this API returns 0. For a non-empty stream the first
+-- value in the stream is used as the initial ewma.
 --
--- @ewma@ of an empty stream is 0.
+-- The higher the smoothing factor the more weight is given to the new value.
+-- Consider some interesting special cases, when @w@ is 1 the ewma is always
+-- the newest value, when @w@ is 0 then the ewma is always the oldest value. If
+-- @w@ is @0.5@ then the new inputs get exponentially weighted by weights
+-- @1\/2, 1\/4, 1\/8, ...@, see below for details.
 --
--- Exponential weighted moving average, \(s_n\), of \(n\) values,
--- \(x_1,\ldots,x_n\), is defined recursively as:
+-- Mathematically, we define ewma \(s_n\), of \(n\) values, \(x_1,\ldots,x_n\),
+-- recursively as follows:
 --
--- \(\begin{align} s_0& = x_0\\ s_n & = \alpha x_{n} + (1-\alpha)s_{n-1},\quad n>0 \end{align}\)
+-- \(\begin{align} s_0& = x_0, \quad \text{initial value}\\ s_n & = \alpha x_{n} + (1-\alpha)s_{n-1},\quad n>0 \end{align}\)
 --
--- If we expand the recursive term it becomes an exponential series:
+-- If we expand the recursive term it reveals an exponential series:
 --
 -- \(s_n = \alpha \left[x_n + (1-\alpha)x_{n-1} + (1-\alpha)^2 x_{n-2} + \cdots + (1-\alpha)^{n-1} x_1 \right] + (1-\alpha)^n x_0\)
 --
@@ -457,10 +474,11 @@ ewma k = extract <$> Scanl.mkScanl step (Tuple' 0 1)
 
     extract (Tuple' x _) = x
 
--- | @ewma n k@ is like 'ewma' but uses 1 as the initial smoothing factor and
--- then exponentially smooths it to @k@ using @n@ as the smoothing factor.
---
--- This is significantly faster than 'ewmaAfterMean'.
+-- | @ewma n k@ is like 'ewma' but the smoothing factor used is itself
+-- exponentially smoothened starting from @1@ to the final value @k@ using @n@
+-- as its smoothing factor. In other words, the smoothing factor is derived by
+-- smoothing the series @1,k,k,k,...@ using @n@ as the smoothing factor. As
+-- time goes on, the smoothing factor gets closer to k.
 --
 {-# INLINE ewmaRampUpSmoothing #-}
 ewmaRampUpSmoothing :: Monad m => Double -> Double -> Scanl m Double Double
@@ -477,6 +495,50 @@ ewmaRampUpSmoothing n k1 = extract <$> Scanl.mkScanl step initial
 
     extract (Tuple' x _) = x
 
+data Ewma = EwmaInit | EwmaGo !Double !Double
+
+-- | @incrEwma w@ computes the ewma of the elements incrementally in a window
+-- using @w@ as the smoothing factor.
+--
+-- ewma can be calculated incrementally as follows. If \(x_i\) is the value
+-- entering the window, n is the size of window, \(x_1\) is the second last value
+-- in the old window and \(x_0\) is the value exiting the window:
+--
+-- \(s_{new} = \alpha x_i + (1-\alpha)s_{old} + (1-\alpha)^{n} (x_1 - x_0\)\)
+--
+-- /Unimplemented/
+--
+{-# INLINE incrEwma #-}
+incrEwma :: MonadIO m =>
+    Double -> Scanl m (Incr Double, Ring.RingArray Double) Double
+incrEwma w = Scanl step initial extract extract
+
+    where
+
+    initial = do
+        when (w < 0 || w > 1)
+            $ error "incrEwma: weight must be >= 0 and <= 1"
+        return $ Partial EwmaInit
+
+    step EwmaInit (Insert x, rng) = do
+        let len = Ring.length rng
+        assert (len == 0) (return ())
+        return $ Partial (EwmaGo x 1)
+
+    step EwmaInit _ = error "incrEwma: the first window operation must be Insert"
+
+    step (EwmaGo s k) (Insert x, _) = do
+        let s1 = w * x + (1 - w) * s
+        return $ Partial (EwmaGo s1 (k * (1 - w)))
+
+    step (EwmaGo s k) (Replace x x0, rng) = do
+        x1 <- Ring.unsafeGetIndex 0 rng
+        let s1 = w * x + (1 - w) * s + k * (x1 - x0)
+        return $ Partial (EwmaGo s1 k)
+
+    extract EwmaInit = return 0
+    extract (EwmaGo x _) = return x
+
 -------------------------------------------------------------------------------
 -- Spread/Dispersion
 -------------------------------------------------------------------------------