Skip to content

Commit

Permalink
Merge pull request #32 from ajcr/alternative-rolling-indexed
Browse files Browse the repository at this point in the history
Add indexed support to rolling base class
  • Loading branch information
ajcr authored May 30, 2023
2 parents c042949 + 367d365 commit d1a41ce
Show file tree
Hide file tree
Showing 21 changed files with 323 additions and 29 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

A collection of computationally efficient rolling window iterators for Python.

Useful arithmetical, logical and statistical operations on rolling windows (including `Sum`, `Min`, `Max`, `Mean`, `Median` and more). Both fixed-length and variable-length windows are supported for most operations.
Useful arithmetical, logical and statistical operations on rolling windows (including `Sum`, `Min`, `Max`, `Mean`, `Median` and more). Both fixed-length and variable-length windows are supported for most operations. Many operations also support "indexed" windows.

To get started, see the [Overview](https://github.com/ajcr/rolling#overview) section below, or have a look at the some [recipes](https://github.com/ajcr/rolling/blob/master/doc/recipes.md).

Expand Down Expand Up @@ -53,6 +53,8 @@ This library implements efficient ways to perform useful operations on rolling w
[5, 9, 9]
```

Note that these time complexity values apply to "fixed" and "variable" window types (not the "indexed" window type which depends on the index values encountered).

## Operations

The algorithms implemented so far in this module are summarised below.
Expand Down Expand Up @@ -130,6 +132,21 @@ This allows windows smaller than the specified size to be evaluated at the begin
(2,)]
```

If values are indexed by a monotoncally-increasing index (e.g. with an integer key, timestamp or datetime) then the indexed window type can be used. The size of the window is the maximum distance between the oldest and newest values (e.g. an integer, or timedelta):
```python
>>> idx = [0, 1, 2, 6, 7, 11, 15]
>>> seq = [3, 1, 4, 1, 5, 9, 2]
>>> roll_list_idx = rolling.Apply(zip(idx, seq), window_size=3, operation=tuple, window_type='indexed')
>>> list(roll_list_idx)
[(3,),
(3, 1),
(3, 1, 4),
(1,),
(1, 5),
(9,),
(2,)]
```

## References and resources

Some rolling algorithms are widely known (e.g. `Sum`), so I am not sure which source to cite. Some algorithms I made up as I was putting the module together (e.g. `Any`, `All`), but these are relatively simple and probably exist elsewhere.
Expand Down
5 changes: 3 additions & 2 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [0.5.0]
### Added
- New `rolling.ApplyPairwise` object
- New `rolling.ApplyPairwise` object.
- Add `"indexed"` window type to enable operations on windows with an index.

## [0.4.0]
## [0.4.0] - 2023-03-11
### Added
- `__version__` attribute added to package
- New `rolling.PolynomialHash()` object
Expand Down
9 changes: 5 additions & 4 deletions rolling/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ class Apply(RollingObject):
"""

def _init_fixed(self, iterable, window_size, operation=sum, **kwargs):
head = islice(self._iterator, window_size - 1)
self._buffer = deque(head, maxlen=window_size)
self._buffer = deque([None])
self._buffer.extend(islice(self._iterator, window_size - 1))
self._operation = operation

def _init_variable(self, iterable, window_size, operation=sum, **kwargs):
self._buffer = deque(maxlen=window_size)
self._buffer = deque()
self._operation = operation

@property
Expand All @@ -70,7 +70,8 @@ def _remove_old(self):
self._buffer.popleft()

def _update_window(self, new):
self._buffer.append(new)
self._add_new(new)
self._remove_old()

@property
def _obs(self):
Expand Down
4 changes: 2 additions & 2 deletions rolling/arithmetic/nunique.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ class Nunique(RollingObject):

def _init_fixed(self, iterable, window_size, **kwargs):
head = islice(self._iterator, window_size - 1)
self._buffer = deque(head, maxlen=window_size)
self._buffer = deque(head)
# append a dummy value that is removed when next() is called
self._buffer.appendleft("dummy_value")
self._counter = Counter(self._buffer)

def _init_variable(self, iterable, window_size, **kwargs):
self._buffer = deque(maxlen=window_size)
self._buffer = deque()
self._counter = Counter()

def _update_window(self, new):
Expand Down
4 changes: 2 additions & 2 deletions rolling/arithmetic/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Product(RollingObject):

def _init_fixed(self, iterable, window_size, **kwargs):
head = islice(self._iterator, window_size - 1)
self._buffer = deque(head, maxlen=window_size)
self._buffer = deque(head)
self._zero_count = 0

prod = 1
Expand All @@ -57,7 +57,7 @@ def _init_fixed(self, iterable, window_size, **kwargs):
self._product = prod

def _init_variable(self, iterable, window_size, **kwargs):
self._buffer = deque(maxlen=window_size)
self._buffer = deque()
self._zero_count = 0
self._product = 1

Expand Down
2 changes: 1 addition & 1 deletion rolling/arithmetic/sum.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _init_fixed(self, iterable, window_size, **kwargs):
self._sum = sum(self._buffer)

def _init_variable(self, iterable, window_size, **kwargs):
self._buffer = deque(maxlen=window_size)
self._buffer = deque()
self._sum = 0

def _update_window(self, new):
Expand Down
58 changes: 49 additions & 9 deletions rolling/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
from collections import deque
from collections.abc import Iterator
from itertools import chain

Expand All @@ -7,8 +8,8 @@ class RollingObject(Iterator):
"""
Baseclass for rolling iterator objects.
All of the iteration logic specific to 'fixed' and
'variable' window types is handled by this class.
All iteration logic for 'fixed', 'variable' and
'indexed' window types is handled by this class.
Subclasses of RollingObject must implement methods
to initialize and manipulate any attributes needed
Expand All @@ -32,7 +33,7 @@ class RollingObject(Iterator):

def __init__(self, iterable, window_size, window_type="fixed", **kwargs):
self.window_type = window_type
self.window_size = _validate_window_size(window_size)
self.window_size = _validate_window_size(window_size, window_type)
self._iterator = iter(iterable)
self._filled = self.window_type == "fixed"

Expand All @@ -42,6 +43,13 @@ def __init__(self, iterable, window_size, window_type="fixed", **kwargs):
elif window_type == "variable":
self._init_variable(iterable, window_size, **kwargs)

elif window_type == "indexed":
# keep track of all indexes that we encounter. Assumes that all
# values we encounter will be stored in the same order. If not,
# the subtype will need to implement its own _next_indexed() method.
self._index_buffer = deque()
self._init_indexed(iterable, window_size, **kwargs)

else:
raise ValueError(f"Unknown window_type '{window_type}'")

Expand Down Expand Up @@ -75,6 +83,26 @@ def _next_variable(self):
self._remove_old()
return self.current_value

def _next_indexed(self):
new_index, new_value = next(self._iterator)

if self._index_buffer and new_index < self._index_buffer[-1]:
raise ValueError(
"Next index must be greater than or equal to last added index: "
f"{new_index} < {self._index_buffer[0]}"
)

self._index_buffer.append(new_index)
self._add_new(new_value)

min_index = new_index - self.window_size

while self._index_buffer and self._index_buffer[0] <= min_index:
self._remove_old()
self._index_buffer.popleft()

return self.current_value

def __next__(self):

if self.window_type == "fixed":
Expand All @@ -83,6 +111,9 @@ def __next__(self):
if self.window_type == "variable":
return self._next_variable()

if self.window_type == "indexed":
return self._next_indexed()

raise NotImplementedError(f"next() not implemented for {self.window_type}")

def extend(self, iterable):
Expand Down Expand Up @@ -134,6 +165,14 @@ def _init_variable(self):
"""
pass

def _init_indexed(self, *args, **kwargs):
"""
Intialise as an indexed window.
In most cases this is the same as initialising a variable-size window.
"""
return self._init_variable(*args, **kwargs)

@abc.abstractmethod
def _remove_old(self):
"""
Expand All @@ -156,12 +195,13 @@ def _update_window(self, new):
pass


def _validate_window_size(k):
def _validate_window_size(window_size, window_type):
"""
Check if k is a positive integer
"""
if not isinstance(k, int):
raise TypeError(f"window_size must be integer type, got {type(k).__name__}")
if k <= 0:
raise ValueError("window_size must be positive")
return k
if window_type in {"fixed", "variable"}:
if not isinstance(window_size, int):
raise TypeError(f"window_size must be integer type, got {type(window_size).__name__}")
if window_size <= 0:
raise ValueError("window_size must be positive")
return window_size
3 changes: 3 additions & 0 deletions rolling/entropy.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,6 @@ def _add_new(self, new):

def _remove_old(self):
pass

def _init_indexed(self, *args, **kwargs):
raise NotImplementedError("window_type='indexed'")
3 changes: 3 additions & 0 deletions rolling/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ def current_value(self):
@property
def _obs(self):
return len(self._buffer)

def _init_indexed(self, *args, **kwargs):
raise NotImplementedError("window_type='indexed'")
3 changes: 3 additions & 0 deletions rolling/logical/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@ def _obs(self):
def current_value(self):
return self._i - self._window_obs >= self._last_false

def _init_indexed(self, *args, **kwargs):
raise NotImplementedError("window_type='indexed'")

3 changes: 3 additions & 0 deletions rolling/logical/any.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@ def _obs(self):
@property
def current_value(self):
return self._i - self._window_obs < self._last_true

def _init_indexed(self, *args, **kwargs):
raise NotImplementedError("window_type='indexed'")
8 changes: 8 additions & 0 deletions rolling/minmax.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def _obs(self):
def current_value(self):
return _value(self._buffer[0])

def _init_indexed(self, *args, **kwargs):
raise NotImplementedError("window_type='indexed'")


class Max(RollingObject):
"""
Expand Down Expand Up @@ -173,6 +176,8 @@ def _obs(self):
def current_value(self):
return _value(self._buffer[0])

def _init_indexed(self, *args, **kwargs):
raise NotImplementedError("window_type='indexed'")

class MinHeap(RollingObject):
"""
Expand Down Expand Up @@ -247,3 +252,6 @@ def _obs(self):
@property
def current_value(self):
return _value(self._heap[0])

def _init_indexed(self, *args, **kwargs):
raise NotImplementedError("window_type='indexed'")
7 changes: 7 additions & 0 deletions rolling/stats/kurtosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ def _init_variable(self, iterable, window_size, **kwargs):
self._x3 = 0.0
self._x4 = 0.0

def _init_indexed(self, iterable, window_size, **kwargs):
self._buffer = deque()
self._x1 = 0.0
self._x2 = 0.0
self._x3 = 0.0
self._x4 = 0.0

def _add_new(self, new):
self._buffer.append(new)

Expand Down
9 changes: 5 additions & 4 deletions rolling/stats/median.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ def __init__(
tracker="sortedlist",
):

self._buffer = deque(maxlen=window_size)
self._buffer = deque()

if tracker == "skiplist":
self._tracker = IndexableSkiplist(window_size)
elif tracker == "sortedlist":

if tracker == "sortedlist":
self._tracker = SortedList()
elif tracker == "skiplist":
self._tracker = IndexableSkiplist(window_size)
else:
raise ValueError(f"tracker must be one of 'skiplist' or 'sortedlist'")

Expand Down
2 changes: 1 addition & 1 deletion rolling/stats/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _init_fixed(self, iterable, window_size, return_count=False, **kwargs):
self._bicounter.increment("DUMMY_VALUE")

def _init_variable(self, iterable, window_size, return_count=False, **kwargs):
self._buffer = deque(maxlen=window_size)
self._buffer = deque()
self.return_count = return_count
self._bicounter = BiCounter()

Expand Down
6 changes: 6 additions & 0 deletions rolling/stats/skew.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ def _init_variable(self, iterable, window_size, **kwargs):
self._x2 = 0.0
self._x3 = 0.0

def _init_indexed(self, iterable, window_size, **kwargs):
self._buffer = deque()
self._x1 = 0.0
self._x2 = 0.0
self._x3 = 0.0

def _add_new(self, new):
self._buffer.append(new)

Expand Down
6 changes: 6 additions & 0 deletions rolling/stats/variance.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def _init_variable(self, iterable, window_size, ddof=1, **kwargs):
self._mean = 0.0 # mean of values
self._sslm = 0.0 # sum of squared values less the mean

def _init_indexed(self, iterable, window_size, ddof=1, **kwargs):
self.ddof = ddof
self._buffer = deque()
self._mean = 0.0 # mean of values
self._sslm = 0.0 # sum of squared values less the mean

def _add_new(self, new):
self._buffer.append(new)

Expand Down
Loading

0 comments on commit d1a41ce

Please sign in to comment.