Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add first and last aggregations to Rolling and Expanding #60579

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Other enhancements
- :meth:`Series.cummin` and :meth:`Series.cummax` now supports :class:`CategoricalDtype` (:issue:`52335`)
- :meth:`Series.plot` now correctly handle the ``ylabel`` parameter for pie charts, allowing for explicit control over the y-axis label (:issue:`58239`)
- :meth:`DataFrame.plot.scatter` argument ``c`` now accepts a column of strings, where rows with the same string are colored identically (:issue:`16827` and :issue:`16485`)
- :class:`Rolling` and :class:`Expanding` now support aggregations ``first`` and ``last`` (:issue:`33155`)
- :func:`read_parquet` accepts ``to_pandas_kwargs`` which are forwarded to :meth:`pyarrow.Table.to_pandas` which enables passing additional keywords to customize the conversion to pandas, such as ``maps_as_pydicts`` to read the Parquet map data type as python dictionaries (:issue:`56842`)
- :meth:`DataFrameGroupBy.transform`, :meth:`SeriesGroupBy.transform`, :meth:`DataFrameGroupBy.agg`, :meth:`SeriesGroupBy.agg`, :meth:`RollingGroupby.apply`, :meth:`ExpandingGroupby.apply`, :meth:`Rolling.apply`, :meth:`Expanding.apply`, :meth:`DataFrame.apply` with ``engine="numba"`` now supports positional arguments passed as kwargs (:issue:`58995`)
- :meth:`Rolling.agg`, :meth:`Expanding.agg` and :meth:`ExponentialMovingWindow.agg` now accept :class:`NamedAgg` aggregations through ``**kwargs`` (:issue:`28333`)
Expand Down
12 changes: 12 additions & 0 deletions pandas/_libs/window/aggregations.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ def roll_min(
end: np.ndarray, # np.ndarray[np.int64]
minp: int, # int64_t
) -> np.ndarray: ... # np.ndarray[float]
def roll_first(
values: np.ndarray, # np.ndarray[np.float64]
start: np.ndarray, # np.ndarray[np.int64]
end: np.ndarray, # np.ndarray[np.int64]
minp: int, # int64_t
) -> np.ndarray: ... # np.ndarray[float]
def roll_last(
values: np.ndarray, # np.ndarray[np.float64]
start: np.ndarray, # np.ndarray[np.int64]
end: np.ndarray, # np.ndarray[np.int64]
minp: int, # int64_t
) -> np.ndarray: ... # np.ndarray[float]
def roll_quantile(
values: np.ndarray, # const float64_t[:]
start: np.ndarray, # np.ndarray[np.int64]
Expand Down
83 changes: 83 additions & 0 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,89 @@ cdef _roll_min_max(ndarray[float64_t] values,

return output

# ----------------------------------------------------------------------
# Rolling first, last


def roll_first(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
return _roll_first_last(values, start, end, minp, is_first=1)


def roll_last(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
return _roll_first_last(values, start, end, minp, is_first=0)


cdef _roll_first_last(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, bint is_first):
cdef:
Py_ssize_t i, j, fl_idx
bint is_monotonic_increasing_bounds
int64_t nobs = 0, N = len(start), s, e
float64_t val, res
ndarray[float64_t] output

is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
start, end
)

output = np.empty(N, dtype=np.float64)

if (end - start).max() == 0:
output[:] = NaN
return output

with nogil:
for i in range(0, N):
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
fl_idx = -1
nobs = 0
for j in range(s, e):
val = values[j]
if val == val:
if not is_first or fl_idx < s:
fl_idx = j
nobs += 1
else:
# handle deletes
for j in range(start[i - 1], s):
val = values[j]
if val == val:
nobs -= 1

# update fl_idx if out of range, if first
if is_first and fl_idx < s:
fl_idx = -1
for j in range(s, end[i - 1]):
val = values[j]
if val == val:
fl_idx = j
break

# handle adds
for j in range(end[i - 1], e):
val = values[j]
if val == val:
if not is_first or fl_idx < s:
fl_idx = j
nobs += 1

if nobs >= minp and fl_idx >= s:
res = values[fl_idx]
else:
res = NaN

output[i] = res

if not is_monotonic_increasing_bounds:
nobs = 0

return output


cdef enum InterpolationType:
LINEAR,
Expand Down
58 changes: 58 additions & 0 deletions pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,64 @@ def skew(self, numeric_only: bool = False):
def kurt(self, numeric_only: bool = False):
return super().kurt(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show an expanding calculation with a window size of
three.

>>> s = pd.Series(range(5))
>>> s.expanding(3).first()
0 NaN
1 NaN
2 0.0
3 0.0
4 0.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="expanding",
aggregation_description="First (left-most) element of the window",
agg_method="first",
)
def first(self, numeric_only: bool = False):
return super().first(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show an expanding calculation with a window size of
three.

>>> s = pd.Series(range(5))
>>> s.expanding(3).last()
0 NaN
1 NaN
2 2.0
3 3.0
4 4.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="expanding",
aggregation_description="Last (right-most) element of the window",
agg_method="last",
)
def last(self, numeric_only: bool = False):
return super().last(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
Expand Down
74 changes: 74 additions & 0 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,22 @@ def kurt(self, numeric_only: bool = False):
numeric_only=numeric_only,
)

def first(self, numeric_only: bool = False):
window_func = window_aggregations.roll_first
return self._apply(
window_func,
name="first",
numeric_only=numeric_only,
)

def last(self, numeric_only: bool = False):
window_func = window_aggregations.roll_last
return self._apply(
window_func,
name="last",
numeric_only=numeric_only,
)

def quantile(
self,
q: float,
Expand Down Expand Up @@ -2539,6 +2555,64 @@ def sem(self, ddof: int = 1, numeric_only: bool = False):
def kurt(self, numeric_only: bool = False):
return super().kurt(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show a rolling calculation with a window size of
three.

>>> s = pd.Series(range(5))
>>> s.rolling(3).first()
0 NaN
1 NaN
2 0.0
3 1.0
4 2.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="rolling",
aggregation_description="First (left-most) element of the window",
agg_method="first",
)
def first(self, numeric_only: bool = False):
return super().first(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
kwargs_numeric_only,
create_section_header("Returns"),
template_returns,
create_section_header("Examples"),
dedent(
"""
The example below will show a rolling calculation with a window size of
three.

>>> s = pd.Series(range(5))
>>> s.rolling(3).last()
0 NaN
1 NaN
2 2.0
3 3.0
4 4.0
dtype: float64
"""
).replace("\n", "", 1),
window_method="rolling",
aggregation_description="Last (right-most) element of the window",
agg_method="last",
)
def last(self, numeric_only: bool = False):
return super().last(numeric_only=numeric_only)

@doc(
template_header,
create_section_header("Parameters"),
Expand Down
2 changes: 2 additions & 0 deletions pandas/tests/window/test_cython_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def _get_rolling_aggregations():
("roll_median_c", window_aggregations.roll_median_c),
("roll_max", window_aggregations.roll_max),
("roll_min", window_aggregations.roll_min),
("roll_first", window_aggregations.roll_first),
("roll_last", window_aggregations.roll_last),
]
+ [
(
Expand Down
100 changes: 100 additions & 0 deletions pandas/tests/window/test_expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ def test_moment_functions_zero_length_pairwise(f):
lambda x: x.expanding(min_periods=5).corr(x, pairwise=False),
lambda x: x.expanding(min_periods=5).max(),
lambda x: x.expanding(min_periods=5).min(),
lambda x: x.expanding(min_periods=5).first(),
lambda x: x.expanding(min_periods=5).last(),
lambda x: x.expanding(min_periods=5).sum(),
lambda x: x.expanding(min_periods=5).mean(),
lambda x: x.expanding(min_periods=5).std(),
Expand Down Expand Up @@ -596,6 +598,104 @@ def test_expanding_corr_pairwise_diff_length():
tm.assert_frame_equal(result4, expected)


@pytest.mark.parametrize(
"values,method,expected",
[
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"first",
[float("nan"), float("nan"), 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"first",
[
float("nan"),
float("nan"),
float("nan"),
float("nan"),
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
],
),
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"last",
[float("nan"), float("nan"), 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"last",
[
float("nan"),
float("nan"),
float("nan"),
float("nan"),
5.0,
5.0,
7.0,
7.0,
9.0,
9.0,
],
),
],
)
def test_expanding_first_last(values, method, expected):
# GH#33155
x = Series(values)
result = getattr(x.expanding(3), method)()
expected = Series(expected)
tm.assert_almost_equal(result, expected)

x = DataFrame({"A": values})
result = getattr(x.expanding(3), method)()
expected = DataFrame({"A": expected})
tm.assert_almost_equal(result, expected)


@pytest.mark.parametrize(
"values,method,expected",
[
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"first",
[1.0] * 10,
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"first",
[1.0] * 10,
),
(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"last",
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
),
(
[1.0, np.nan, 3.0, np.nan, 5.0, np.nan, 7.0, np.nan, 9.0, np.nan],
"last",
[1.0, 1.0, 3.0, 3.0, 5.0, 5.0, 7.0, 7.0, 9.0, 9.0],
),
],
)
def test_expanding_first_last_no_minp(values, method, expected):
# GH#33155
x = Series(values)
result = getattr(x.expanding(min_periods=0), method)()
expected = Series(expected)
tm.assert_almost_equal(result, expected)

x = DataFrame({"A": values})
result = getattr(x.expanding(min_periods=0), method)()
expected = DataFrame({"A": expected})
tm.assert_almost_equal(result, expected)


def test_expanding_apply_args_kwargs(engine_and_raw):
def mean_w_arg(x, const):
return np.mean(x) + const
Expand Down
Loading
Loading