Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG/ENH: Use pyarrow grouped aggregation functions for pyarrow-backed groupby ops #55234

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions asv_bench/benchmarks/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from string import ascii_letters

import numpy as np
import pyarrow as pa

from pandas import (
NA,
ArrowDtype,
Categorical,
DataFrame,
Index,
Expand Down Expand Up @@ -1098,4 +1100,66 @@ def time_resample_multiindex(self):
).mean()


class GroupByAggregateArrowDtypes:
param_names = ["dtype", "method"]
params = [
[
ArrowDtype(pa.bool_()),
ArrowDtype(pa.decimal128(25, 3)),
ArrowDtype(pa.float64()),
ArrowDtype(pa.int32()),
ArrowDtype(pa.string()),
ArrowDtype(pa.timestamp("s", "UTC")),
ArrowDtype(pa.duration("ms")),
],
[
"any",
"all",
"count",
"sum",
"prod",
"min",
"max",
"mean",
"median",
"std",
],
]

def setup(self, dtype, method):
size = (200_000, 10)
pa_type = dtype.pyarrow_dtype
if pa.types.is_floating(pa_type):
values = np.random.randn(*size)
elif pa.types.is_integer(pa_type):
values = np.random.randint(0, 10_000, size)
elif pa.types.is_decimal(pa_type):
values = np.random.randn(*size)
elif pa.types.is_boolean(pa_type):
values = np.random.randint(0, 2, size, dtype=np.bool_)
elif pa.types.is_timestamp(pa_type):
if method in ["any", "all", "sum", "prod"]:
raise NotImplementedError
values = np.random.randint(0, 10_000, size)
elif pa.types.is_duration(pa_type):
if method == "prod":
raise NotImplementedError
values = np.random.randint(0, 10_000, size)
elif pa.types.is_string(pa_type):
if method in ["any", "all", "sum", "prod", "mean", "median", "std"]:
raise NotImplementedError
values = tm.rands_array(nchars=10, size=size)
else:
raise NotImplementedError

columns = list("abcdefghij")
df = DataFrame(values, columns=columns, dtype=dtype)
df.iloc[::10, ::2] = NA
df["key"] = np.random.randint(0, high=100, size=(len(values)))
self.df = df

def time_frame_agg(self, dtype, method):
self.df.groupby("key").agg(method)


from .pandas_vb_common import setup # noqa: F401 isort:skip
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v2.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ Plotting

Groupby/resample/rolling
^^^^^^^^^^^^^^^^^^^^^^^^
-
- Bug in :meth:`.SeriesGroupBy.var` and :meth:`.DataFrameGroupBy.var` where the dtype would be ``np.float64`` for data with :class:`ArrowDtype` with ``pyarrow.decimal128`` type (:issue:`54627`)
-

Reshaping
Expand Down
240 changes: 184 additions & 56 deletions pandas/core/arrays/arrow/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -1466,44 +1466,16 @@ def _reduce_pyarrow(self, name: str, *, skipna: bool = True, **kwargs) -> pa.Sca
------
TypeError : subclass does not define reductions
"""
pa_type = self._pa_array.type

data_to_reduce = self._pa_array

cast_kwargs = {} if pa_version_under13p0 else {"safe": False}

if name in ["any", "all"] and (
pa.types.is_integer(pa_type)
or pa.types.is_floating(pa_type)
or pa.types.is_duration(pa_type)
or pa.types.is_decimal(pa_type)
):
# pyarrow only supports any/all for boolean dtype, we allow
# for other dtypes, matching our non-pyarrow behavior

if pa.types.is_duration(pa_type):
data_to_cmp = self._pa_array.cast(pa.int64())
else:
data_to_cmp = self._pa_array

not_eq = pc.not_equal(data_to_cmp, 0)
data_to_reduce = not_eq

elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type):
data_to_reduce = self._pa_array.cast(pa.int64())

elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type):
nbits = pa_type.bit_width
if nbits == 32:
data_to_reduce = self._pa_array.cast(pa.int32())
else:
data_to_reduce = self._pa_array.cast(pa.int64())
data_to_reduce = self._values_for_reduction(name)

if name == "sem":

def pyarrow_meth(data, skip_nulls, **kwargs):
numerator = pc.stddev(data, skip_nulls=skip_nulls, **kwargs)
denominator = pc.sqrt_checked(pc.count(self._pa_array))
if pa_version_under8p0:
denominator = pc.power_checked(pc.count(self._pa_array), 0.5)
else:
denominator = pc.sqrt_checked(pc.count(self._pa_array))
return pc.divide_checked(numerator, denominator)

else:
Expand Down Expand Up @@ -1541,21 +1513,94 @@ def pyarrow_meth(data, skip_nulls, **kwargs):
if name == "median":
# GH 52679: Use quantile instead of approximate_median; returns array
result = result[0]
if pc.is_null(result).as_py():
return result

return self._maybe_cast_reduction_result(result, name)

def _values_for_reduction(self, name: str) -> pa.ChunkedArray:
"""
Return the underlying ChunkedArray, possibly cast to a different type in
order to support reductions.

Parameters
----------
name : str
Name of the function, supported values are:
{ any, all, min, max, sum, mean, median, prod,
std, var, sem, kurt, skew }.

Returns
-------
pa.ChunkedArray
"""
pa_type = self._pa_array.type

if name in ["any", "all"] and (
pa.types.is_integer(pa_type)
or pa.types.is_floating(pa_type)
or pa.types.is_duration(pa_type)
or pa.types.is_decimal(pa_type)
):
# pyarrow only supports any/all for boolean dtype, we allow
# for other dtypes, matching our non-pyarrow behavior
if pa.types.is_duration(pa_type):
data_to_cmp = self._pa_array.cast(pa.int64())
else:
data_to_cmp = self._pa_array

not_eq = pc.not_equal(data_to_cmp, 0)
data_to_reduce = not_eq

elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type):
data_to_reduce = self._pa_array.cast(pa.int64())

elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type):
nbits = pa_type.bit_width
if nbits == 32:
data_to_reduce = self._pa_array.cast(pa.int32())
else:
data_to_reduce = self._pa_array.cast(pa.int64())

elif name in ["sem", "std", "var"] and pa.types.is_boolean(pa_type):
data_to_reduce = self._pa_array.cast(pa.uint8())

else:
data_to_reduce = self._pa_array

return data_to_reduce

def _maybe_cast_reduction_result(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC at least some of the bugfix is coming from adding this casting to _groupby_op. Could split off from this PR one that implements _values_for_reduction and _maybe_cast_reduction_result?

self, result: pa.Scalar | pa.ChunkedArray, name: str
):
"""
Maybe cast a reduction result to a different pyarrow type.

See ArrowExtensionArray._values_for_reduction.

Parameters
----------
result : pa.Scalar | pa.ChunkedArray
name : str
Name of the function, supported values are:
{ any, all, min, max, sum, mean, median, prod,
std, var, sem, kurt, skew }.

Returns
-------
pa.Scalar or pa.ChunkedArray
"""
pa_type = self._pa_array.type
if name in ["min", "max", "sum"] and pa.types.is_duration(pa_type):
result = result.cast(pa_type)
if name in ["median", "mean"] and pa.types.is_temporal(pa_type):
if not pa_version_under13p0:
nbits = pa_type.bit_width
if nbits == 32:
result = result.cast(pa.int32(), **cast_kwargs)
result = pc.cast(result, pa.int32(), safe=False)
else:
result = result.cast(pa.int64(), **cast_kwargs)
result = pc.cast(result, pa.int64(), safe=False)
result = result.cast(pa_type)
if name in ["std", "sem"] and pa.types.is_temporal(pa_type):
result = result.cast(pa.int64(), **cast_kwargs)
result = pc.cast(result, pa.int64(), safe=False)
jbrockmendel marked this conversation as resolved.
Show resolved Hide resolved
if pa.types.is_duration(pa_type):
result = result.cast(pa_type)
elif pa.types.is_time(pa_type):
Expand Down Expand Up @@ -1999,27 +2044,110 @@ def _groupby_op(
**kwargs,
)

# maybe convert to a compatible dtype optimized for groupby
values: ExtensionArray
pa_type = self._pa_array.type
if pa.types.is_timestamp(pa_type):
values = self._to_datetimearray()
elif pa.types.is_duration(pa_type):
values = self._to_timedeltaarray()
pa_supported_groupby_aggregations = [
"any",
"all",
"sum",
"prod",
"min",
"max",
"mean",
"sem",
"std",
"var",
]
# aggregations not yet supported natively in pyarrow:
# - first/last: https://github.com/apache/arrow/issues/36709
# - nth
# - median

if how not in pa_supported_groupby_aggregations:
# maybe convert to a compatible dtype for groupby
values: ExtensionArray
pa_type = self._pa_array.type
if pa.types.is_timestamp(pa_type):
values = self._to_datetimearray()
elif pa.types.is_duration(pa_type):
values = self._to_timedeltaarray()
else:
values = self._to_masked()

result = values._groupby_op(
how=how,
has_dropped_na=has_dropped_na,
min_count=min_count,
ngroups=ngroups,
ids=ids,
**kwargs,
)
if isinstance(result, np.ndarray):
return result
return type(self)._from_sequence(result, copy=False)

arr = self._values_for_reduction(how)

pa_name = {
"prod": "product",
"std": "stddev",
"var": "variance",
}.get(how, how)

# pyarrow does not accept a min_count of -1
if min_count == -1:
if how in ["any", "all"]:
min_count = 0
else:
min_count = 1
rhshadrach marked this conversation as resolved.
Show resolved Hide resolved

if how in ["std", "var"]:
aggregations = [
(
"val",
pa_name,
pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count),
)
]
elif how == "sem":
aggregations = [
(
"val",
"stddev",
pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count),
),
(
"val",
"count",
pc.CountOptions(mode="only_valid"),
),
]
else:
values = self._to_masked()

result = values._groupby_op(
how=how,
has_dropped_na=has_dropped_na,
min_count=min_count,
ngroups=ngroups,
ids=ids,
**kwargs,
aggregations = [
(
"val",
pa_name,
pc.ScalarAggregateOptions(min_count=min_count),
)
]

res = (
pa.Table.from_pydict({"id": ids, "val": arr})
.group_by("id")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this is re-calculating the codes we already have in our GroupBy object. I see the appeal, but am wary of this pattern. I think the long-term solution is to get pyarrow to expose something to which we can pass the already-computed codes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's fair. Also, it would be nice to pass all aggregations at once to pyarrow - see the last comment in the OP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Longer term, there are potentially big performance gains if all arrow-backed aggregations can be passed to pyarrow at once. At the moment, we're iterating column by column. Here is a small example showing the impact:

Any idea what is driving the perf impact? Does the pyarrow version do something in parallel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell from reading the docs whether anything is done in parallel. It could be simply (as you've pointed out) that pyarrow is only computing the group codes once when done in a single batch and recomputing them when done iteratively.

.aggregate(aggregations)
)
if isinstance(result, np.ndarray):
return result
return type(self)._from_sequence(result, copy=False)

if how == "sem":
numerator = res["val_stddev"]
if pa_version_under8p0:
denominator = pc.power_checked(res["val_count"], 0.5)
else:
denominator = pc.sqrt_checked(res["val_count"])
result = pc.divide_checked(numerator, denominator)
else:
result = res[f"val_{pa_name}"]

result = self._maybe_cast_reduction_result(result, how)
return type(self)(result)

def _apply_elementwise(self, func: Callable) -> list[list[Any]]:
"""Apply a callable to each element while maintaining the chunking structure."""
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/arrays/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2248,7 +2248,7 @@ def _groupby_op(
Parameters
----------
how : {'any', 'all', 'sum', 'prod', 'min', 'max', 'mean', 'median',
'median', 'var', 'std', 'sem', 'nth', 'last', 'ohlc',
'var', 'std', 'sem', 'nth', 'first', 'last', 'ohlc',
'cumprod', 'cumsum', 'cummin', 'cummax', 'rank'}
has_dropped_na : bool
min_count : int
Expand Down
Loading