Skip to content

Commit

Permalink
ENH: Add first and last aggregations to Rolling and Expanding
Browse files Browse the repository at this point in the history
  • Loading branch information
snitish committed Dec 16, 2024
1 parent 9501650 commit a23edf1
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 1 deletion.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Other enhancements
- :meth:`Series.cummin` and :meth:`Series.cummax` now supports :class:`CategoricalDtype` (:issue:`52335`)
- :meth:`Series.plot` now correctly handle the ``ylabel`` parameter for pie charts, allowing for explicit control over the y-axis label (:issue:`58239`)
- :meth:`DataFrame.plot.scatter` argument ``c`` now accepts a column of strings, where rows with the same string are colored identically (:issue:`16827` and :issue:`16485`)
- :class:`Rolling` and :class:`Expanding` now support aggregations ``first`` and ``last`` (:issue:`33155`)
- :func:`read_parquet` accepts ``to_pandas_kwargs`` which are forwarded to :meth:`pyarrow.Table.to_pandas` which enables passing additional keywords to customize the conversion to pandas, such as ``maps_as_pydicts`` to read the Parquet map data type as python dictionaries (:issue:`56842`)
- :meth:`DataFrameGroupBy.transform`, :meth:`SeriesGroupBy.transform`, :meth:`DataFrameGroupBy.agg`, :meth:`SeriesGroupBy.agg`, :meth:`RollingGroupby.apply`, :meth:`ExpandingGroupby.apply`, :meth:`Rolling.apply`, :meth:`Expanding.apply`, :meth:`DataFrame.apply` with ``engine="numba"`` now supports positional arguments passed as kwargs (:issue:`58995`)
- :meth:`Rolling.agg`, :meth:`Expanding.agg` and :meth:`ExponentialMovingWindow.agg` now accept :class:`NamedAgg` aggregations through ``**kwargs`` (:issue:`28333`)
Expand Down
12 changes: 12 additions & 0 deletions pandas/_libs/window/aggregations.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ def roll_min(
end: np.ndarray, # np.ndarray[np.int64]
minp: int, # int64_t
) -> np.ndarray: ... # np.ndarray[float]
def roll_first(
values: np.ndarray, # np.ndarray[np.float64]
start: np.ndarray, # np.ndarray[np.int64]
end: np.ndarray, # np.ndarray[np.int64]
minp: int, # int64_t
) -> np.ndarray: ... # np.ndarray[float]
def roll_last(
values: np.ndarray, # np.ndarray[np.float64]
start: np.ndarray, # np.ndarray[np.int64]
end: np.ndarray, # np.ndarray[np.int64]
minp: int, # int64_t
) -> np.ndarray: ... # np.ndarray[float]
def roll_quantile(
values: np.ndarray, # const float64_t[:]
start: np.ndarray, # np.ndarray[np.int64]
Expand Down
83 changes: 83 additions & 0 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,89 @@ cdef _roll_min_max(ndarray[float64_t] values,

return output

# ----------------------------------------------------------------------
# Rolling first, last


def roll_first(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
return _roll_first_last(values, start, end, minp, is_first=1)


def roll_last(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
return _roll_first_last(values, start, end, minp, is_first=0)


cdef _roll_first_last(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, bint is_first):
cdef:
Py_ssize_t i, j, fl_idx
bint is_monotonic_increasing_bounds
int64_t nobs = 0, N = len(start), s, e
float64_t val, res
ndarray[float64_t] output

is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
start, end
)

output = np.empty(N, dtype=np.float64)

if (end - start).max() == 0:
output[:] = NaN
return output

with nogil:
for i in range(0, N):
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
fl_idx = -1
nobs = 0
for j in range(s, e):
val = values[j]
if val == val:
if not is_first or fl_idx < s:
fl_idx = j
nobs += 1
else:
# handle deletes
for j in range(start[i - 1], s):
val = values[j]
if val == val:
nobs -= 1

# update fl_idx if out of range, if first
if is_first and fl_idx < s:
fl_idx = -1
for j in range(s, end[i - 1]):
val = values[j]
if val == val:
fl_idx = j
break

# handle adds
for j in range(end[i - 1], e):
val = values[j]
if val == val:
if not is_first or fl_idx < s:
fl_idx = j
nobs += 1

if nobs >= minp and fl_idx >= s:
res = values[fl_idx]
else:
res = NaN

output[i] = res

if not is_monotonic_increasing_bounds:
nobs = 0

return output


cdef enum InterpolationType:
LINEAR,
Expand Down
58 changes: 58 additions & 0 deletions pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,64 @@ def skew(self, numeric_only: bool = False):
def kurt(self, numeric_only: bool = False):
return super().kurt(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show an expanding calculation with a window size of
three.
>>> s = pd.Series(range(5))
>>> s.expanding(3).first()
0 NaN
1 NaN
2 0.0
3 0.0
4 0.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="expanding",
aggregation_description="First (left-most) element of the window",
agg_method="first",
)
def first(self, numeric_only: bool = False):
return super().first(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show an expanding calculation with a window size of
three.
>>> s = pd.Series(range(5))
>>> s.expanding(3).last()
0 NaN
1 NaN
2 2.0
3 3.0
4 4.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="expanding",
aggregation_description="Last (right-most) element of the window",
agg_method="last",
)
def last(self, numeric_only: bool = False):
return super().last(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
Expand Down
74 changes: 74 additions & 0 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,22 @@ def kurt(self, numeric_only: bool = False):
numeric_only=numeric_only,
)

def first(self, numeric_only: bool = False):
window_func = window_aggregations.roll_first
return self._apply(
window_func,
name="first",
numeric_only=numeric_only,
)

def last(self, numeric_only: bool = False):
window_func = window_aggregations.roll_last
return self._apply(
window_func,
name="last",
numeric_only=numeric_only,
)

def quantile(
self,
q: float,
Expand Down Expand Up @@ -2539,6 +2555,64 @@ def sem(self, ddof: int = 1, numeric_only: bool = False):
def kurt(self, numeric_only: bool = False):
return super().kurt(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show a rolling calculation with a window size of
three.
>>> s = pd.Series(range(5))
>>> s.rolling(3).first()
0 NaN
1 NaN
2 0.0
3 1.0
4 2.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="rolling",
aggregation_description="First (left-most) element of the window",
agg_method="first",
)
def first(self, numeric_only: bool = False):
return super().first(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show a rolling calculation with a window size of
three.
>>> s = pd.Series(range(5))
>>> s.rolling(3).last()
0 NaN
1 NaN
2 2.0
3 3.0
4 4.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="rolling",
aggregation_description="Last (right-most) element of the window",
agg_method="last",
)
def last(self, numeric_only: bool = False):
return super().last(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
Expand Down
2 changes: 2 additions & 0 deletions pandas/tests/window/test_cython_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def _get_rolling_aggregations():
("roll_median_c", window_aggregations.roll_median_c),
("roll_max", window_aggregations.roll_max),
("roll_min", window_aggregations.roll_min),
("roll_first", window_aggregations.roll_first),
("roll_last", window_aggregations.roll_last),
]
+ [
(
Expand Down
100 changes: 100 additions & 0 deletions pandas/tests/window/test_expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ def test_moment_functions_zero_length_pairwise(f):
lambda x: x.expanding(min_periods=5).corr(x, pairwise=False),
lambda x: x.expanding(min_periods=5).max(),
lambda x: x.expanding(min_periods=5).min(),
lambda x: x.expanding(min_periods=5).first(),
lambda x: x.expanding(min_periods=5).last(),
lambda x: x.expanding(min_periods=5).sum(),
lambda x: x.expanding(min_periods=5).mean(),
lambda x: x.expanding(min_periods=5).std(),
Expand Down Expand Up @@ -596,6 +598,104 @@ def test_expanding_corr_pairwise_diff_length():
tm.assert_frame_equal(result4, expected)


@pytest.mark.parametrize(
"values,method,expected",
[
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"first",
[float("nan"), float("nan"), 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"first",
[
float("nan"),
float("nan"),
float("nan"),
float("nan"),
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
],
),
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"last",
[float("nan"), float("nan"), 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"last",
[
float("nan"),
float("nan"),
float("nan"),
float("nan"),
5.0,
5.0,
7.0,
7.0,
9.0,
9.0,
],
),
],
)
def test_expanding_first_last(values, method, expected):
# GH#33155
x = Series(values)
result = getattr(x.expanding(3), method)()
expected = Series(expected)
tm.assert_almost_equal(result, expected)

x = DataFrame({"A": values})
result = getattr(x.expanding(3), method)()
expected = DataFrame({"A": expected})
tm.assert_almost_equal(result, expected)


@pytest.mark.parametrize(
"values,method,expected",
[
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"first",
[1.0] * 10,
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"first",
[1.0] * 10,
),
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"last",
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"last",
[1.0, 1.0, 3.0, 3.0, 5.0, 5.0, 7.0, 7.0, 9.0, 9.0],
),
],
)
def test_expanding_first_last_no_minp(values, method, expected):
# GH#33155
x = Series(values)
result = getattr(x.expanding(min_periods=0), method)()
expected = Series(expected)
tm.assert_almost_equal(result, expected)

x = DataFrame({"A": values})
result = getattr(x.expanding(min_periods=0), method)()
expected = DataFrame({"A": expected})
tm.assert_almost_equal(result, expected)


def test_expanding_apply_args_kwargs(engine_and_raw):
def mean_w_arg(x, const):
return np.mean(x) + const
Expand Down
Loading

0 comments on commit a23edf1

Please sign in to comment.