Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Series|Expr.rolling_var and Series|Expr.rolling_std #1451

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions docs/api-reference/expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
- quantile
- replace_strict
- rolling_mean
- rolling_std
- rolling_sum
- rolling_var
- round
- sample
- shift
Expand Down
2 changes: 2 additions & 0 deletions docs/api-reference/series.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
- rename
- replace_strict
- rolling_mean
- rolling_std
- rolling_sum
- rolling_var
- round
- sample
- scatter
Expand Down
34 changes: 34 additions & 0 deletions narwhals/_arrow/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,40 @@ def rolling_mean(
center=center,
)

def rolling_var(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
return reuse_series_implementation(
self,
"rolling_var",
window_size=window_size,
min_periods=min_periods,
center=center,
ddof=ddof,
)

def rolling_std(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
return reuse_series_implementation(
self,
"rolling_std",
window_size=window_size,
min_periods=min_periods,
center=center,
ddof=ddof,
)

@property
def dt(self: Self) -> ArrowExprDateTimeNamespace:
return ArrowExprDateTimeNamespace(self)
Expand Down
118 changes: 76 additions & 42 deletions narwhals/_arrow/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from narwhals._arrow.utils import floordiv_compat
from narwhals._arrow.utils import narwhals_to_native_dtype
from narwhals._arrow.utils import native_to_narwhals_dtype
from narwhals._arrow.utils import pad_series
from narwhals._arrow.utils import parse_datetime_format
from narwhals._arrow.utils import validate_column_comparand
from narwhals.translate import to_py_scalar
Expand Down Expand Up @@ -906,27 +907,12 @@ def rolling_sum(
min_periods: int | None,
center: bool,
) -> Self:
import pyarrow as pa # ignore-banned-import
import pyarrow.compute as pc # ignore-banned-import

min_periods = min_periods if min_periods is not None else window_size
if center:
offset_left = window_size // 2
offset_right = offset_left - (
window_size % 2 == 0
) # subtract one if window_size is even

native_series = self._native_series

pad_left = pa.array([None] * offset_left, type=native_series.type)
pad_right = pa.array([None] * offset_right, type=native_series.type)
padded_arr = self._from_native_series(
pa.concat_arrays([pad_left, native_series.combine_chunks(), pad_right])
)
else:
padded_arr = self
padded_series, offset = pad_series(self, window_size=window_size, center=center)

cum_sum = padded_arr.cum_sum(reverse=False).fill_null(
cum_sum = padded_series.cum_sum(reverse=False).fill_null(
value=None, strategy="forward", limit=None
)
rolling_sum = (
Expand All @@ -936,7 +922,7 @@ def rolling_sum(
else cum_sum
)

valid_count = padded_arr.cum_count(reverse=False)
valid_count = padded_series.cum_count(reverse=False)
count_in_window = valid_count - valid_count.shift(window_size).fill_null(
value=0, strategy=None, limit=None
)
Expand All @@ -948,9 +934,7 @@ def rolling_sum(
None,
)
)
if center:
result = result[offset_left + offset_right :]
return result
return result[offset:]

def rolling_mean(
self: Self,
Expand All @@ -959,27 +943,12 @@ def rolling_mean(
min_periods: int | None,
center: bool,
) -> Self:
import pyarrow as pa # ignore-banned-import
import pyarrow.compute as pc # ignore-banned-import

min_periods = min_periods if min_periods is not None else window_size
if center:
offset_left = window_size // 2
offset_right = offset_left - (
window_size % 2 == 0
) # subtract one if window_size is even

native_series = self._native_series

pad_left = pa.array([None] * offset_left, type=native_series.type)
pad_right = pa.array([None] * offset_right, type=native_series.type)
padded_arr = self._from_native_series(
pa.concat_arrays([pad_left, native_series.combine_chunks(), pad_right])
)
else:
padded_arr = self
padded_series, offset = pad_series(self, window_size=window_size, center=center)

cum_sum = padded_arr.cum_sum(reverse=False).fill_null(
cum_sum = padded_series.cum_sum(reverse=False).fill_null(
value=None, strategy="forward", limit=None
)
rolling_sum = (
Expand All @@ -989,7 +958,7 @@ def rolling_mean(
else cum_sum
)

valid_count = padded_arr.cum_count(reverse=False)
valid_count = padded_series.cum_count(reverse=False)
count_in_window = valid_count - valid_count.shift(window_size).fill_null(
value=0, strategy=None, limit=None
)
Expand All @@ -1004,9 +973,74 @@ def rolling_mean(
)
/ count_in_window
)
if center:
result = result[offset_left + offset_right :]
return result
return result[offset:]

def rolling_var(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
import pyarrow.compute as pc # ignore-banned-import

min_periods = min_periods if min_periods is not None else window_size
padded_series, offset = pad_series(self, window_size=window_size, center=center)

cum_sum = padded_series.cum_sum(reverse=False).fill_null(
value=None, strategy="forward", limit=None
)
rolling_sum = (
cum_sum
- cum_sum.shift(window_size).fill_null(value=0, strategy=None, limit=None)
if window_size != 0
else cum_sum
)

cum_sum_sq = (
padded_series.__pow__(2)
.cum_sum(reverse=False)
.fill_null(value=None, strategy="forward", limit=None)
)
rolling_sum_sq = (
cum_sum_sq
- cum_sum_sq.shift(window_size).fill_null(value=0, strategy=None, limit=None)
if window_size != 0
else cum_sum_sq
)

valid_count = padded_series.cum_count(reverse=False)
count_in_window = valid_count - valid_count.shift(window_size).fill_null(
value=0, strategy=None, limit=None
)

result = self._from_native_series(
pc.if_else(
(count_in_window >= min_periods)._native_series,
(rolling_sum_sq - (rolling_sum**2 / count_in_window))._native_series,
None,
)
) / self._from_native_series(
pc.max_element_wise((count_in_window - ddof)._native_series, 0)
)

return result[offset:]

def rolling_std(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
return (
self.rolling_var(
window_size=window_size, min_periods=min_periods, center=center, ddof=ddof
)
** 0.5
)

def __iter__(self: Self) -> Iterator[Any]:
yield from self._native_series.__iter__()
Expand Down
36 changes: 36 additions & 0 deletions narwhals/_arrow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,39 @@ def _parse_time_format(arr: pa.Array) -> str:
if pc.all(matches.is_valid()).as_py():
return time_fmt
return ""


def pad_series(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly a better name to be found

series: ArrowSeries, *, window_size: int, center: bool
) -> tuple[ArrowSeries, int]:
"""Pad series with None values on the left and/or right side, depending on the specified parameters.

Arguments:
series: The input ArrowSeries to be padded.
window_size: The desired size of the window.
center: Specifies whether to center the padding or not.

Returns:
A tuple containing the padded ArrowSeries and the offset value.
"""
import pyarrow as pa # ignore-banned-import

if center:
offset_left = window_size // 2
offset_right = offset_left - (
window_size % 2 == 0
) # subtract one if window_size is even

native_series = series._native_series

pad_left = pa.array([None] * offset_left, type=native_series.type)
pad_right = pa.array([None] * offset_right, type=native_series.type)
padded_arr = series._from_native_series(
pa.concat_arrays([pad_left, native_series.combine_chunks(), pad_right])
)
offset = offset_left + offset_right
else:
padded_arr = series
offset = 0

return padded_arr, offset
58 changes: 58 additions & 0 deletions narwhals/_dask/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,64 @@ def func(
returns_scalar=False,
)

def rolling_var(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
def func(
_input: dask_expr.Series,
_window: int,
_min_periods: int | None,
_center: bool, # noqa: FBT001
_ddof: int,
) -> dask_expr.Series:
return _input.rolling(
window=_window, min_periods=_min_periods, center=_center
).var(ddof=ddof)

return self._from_call(
func,
"rolling_var",
window_size,
min_periods,
center,
ddof,
returns_scalar=False,
)

def rolling_std(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
def func(
_input: dask_expr.Series,
_window: int,
_min_periods: int | None,
_center: bool, # noqa: FBT001
_ddof: int,
) -> dask_expr.Series:
return _input.rolling(
window=_window, min_periods=_min_periods, center=_center
).std(ddof=ddof)

return self._from_call(
func,
"rolling_std",
window_size,
min_periods,
center,
ddof,
returns_scalar=False,
)


class DaskExprStringNamespace:
def __init__(self, expr: DaskExpr) -> None:
Expand Down
34 changes: 34 additions & 0 deletions narwhals/_pandas_like/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,40 @@ def rolling_mean(
center=center,
)

def rolling_var(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
return reuse_series_implementation(
self,
"rolling_var",
window_size=window_size,
min_periods=min_periods,
center=center,
ddof=ddof,
)

def rolling_std(
self: Self,
window_size: int,
*,
min_periods: int | None,
center: bool,
ddof: int,
) -> Self:
return reuse_series_implementation(
self,
"rolling_std",
window_size=window_size,
min_periods=min_periods,
center=center,
ddof=ddof,
)

@property
def str(self: Self) -> PandasLikeExprStringNamespace:
return PandasLikeExprStringNamespace(self)
Expand Down
Loading
Loading