-
-
Notifications
You must be signed in to change notification settings - Fork 18.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BUG/ENH: Use pyarrow grouped aggregation functions for pyarrow-backed groupby ops #55234
Changes from 5 commits
4fcb3c6
e40526b
13cbd10
1b10241
87581d4
adb0d32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1466,44 +1466,16 @@ def _reduce_pyarrow(self, name: str, *, skipna: bool = True, **kwargs) -> pa.Sca | |
------ | ||
TypeError : subclass does not define reductions | ||
""" | ||
pa_type = self._pa_array.type | ||
|
||
data_to_reduce = self._pa_array | ||
|
||
cast_kwargs = {} if pa_version_under13p0 else {"safe": False} | ||
|
||
if name in ["any", "all"] and ( | ||
pa.types.is_integer(pa_type) | ||
or pa.types.is_floating(pa_type) | ||
or pa.types.is_duration(pa_type) | ||
or pa.types.is_decimal(pa_type) | ||
): | ||
# pyarrow only supports any/all for boolean dtype, we allow | ||
# for other dtypes, matching our non-pyarrow behavior | ||
|
||
if pa.types.is_duration(pa_type): | ||
data_to_cmp = self._pa_array.cast(pa.int64()) | ||
else: | ||
data_to_cmp = self._pa_array | ||
|
||
not_eq = pc.not_equal(data_to_cmp, 0) | ||
data_to_reduce = not_eq | ||
|
||
elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): | ||
data_to_reduce = self._pa_array.cast(pa.int64()) | ||
|
||
elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type): | ||
nbits = pa_type.bit_width | ||
if nbits == 32: | ||
data_to_reduce = self._pa_array.cast(pa.int32()) | ||
else: | ||
data_to_reduce = self._pa_array.cast(pa.int64()) | ||
data_to_reduce = self._values_for_reduction(name) | ||
|
||
if name == "sem": | ||
|
||
def pyarrow_meth(data, skip_nulls, **kwargs): | ||
numerator = pc.stddev(data, skip_nulls=skip_nulls, **kwargs) | ||
denominator = pc.sqrt_checked(pc.count(self._pa_array)) | ||
if pa_version_under8p0: | ||
denominator = pc.power_checked(pc.count(self._pa_array), 0.5) | ||
else: | ||
denominator = pc.sqrt_checked(pc.count(self._pa_array)) | ||
return pc.divide_checked(numerator, denominator) | ||
|
||
else: | ||
|
@@ -1541,21 +1513,94 @@ def pyarrow_meth(data, skip_nulls, **kwargs): | |
if name == "median": | ||
# GH 52679: Use quantile instead of approximate_median; returns array | ||
result = result[0] | ||
if pc.is_null(result).as_py(): | ||
return result | ||
|
||
return self._maybe_cast_reduction_result(result, name) | ||
|
||
def _values_for_reduction(self, name: str) -> pa.ChunkedArray: | ||
""" | ||
Return the underlying ChunkedArray, possibly cast to a different type in | ||
order to support reductions. | ||
|
||
Parameters | ||
---------- | ||
name : str | ||
Name of the function, supported values are: | ||
{ any, all, min, max, sum, mean, median, prod, | ||
std, var, sem, kurt, skew }. | ||
|
||
Returns | ||
------- | ||
pa.ChunkedArray | ||
""" | ||
pa_type = self._pa_array.type | ||
|
||
if name in ["any", "all"] and ( | ||
pa.types.is_integer(pa_type) | ||
or pa.types.is_floating(pa_type) | ||
or pa.types.is_duration(pa_type) | ||
or pa.types.is_decimal(pa_type) | ||
): | ||
# pyarrow only supports any/all for boolean dtype, we allow | ||
# for other dtypes, matching our non-pyarrow behavior | ||
if pa.types.is_duration(pa_type): | ||
data_to_cmp = self._pa_array.cast(pa.int64()) | ||
else: | ||
data_to_cmp = self._pa_array | ||
|
||
not_eq = pc.not_equal(data_to_cmp, 0) | ||
data_to_reduce = not_eq | ||
|
||
elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): | ||
data_to_reduce = self._pa_array.cast(pa.int64()) | ||
|
||
elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type): | ||
nbits = pa_type.bit_width | ||
if nbits == 32: | ||
data_to_reduce = self._pa_array.cast(pa.int32()) | ||
else: | ||
data_to_reduce = self._pa_array.cast(pa.int64()) | ||
|
||
elif name in ["sem", "std", "var"] and pa.types.is_boolean(pa_type): | ||
data_to_reduce = self._pa_array.cast(pa.uint8()) | ||
|
||
else: | ||
data_to_reduce = self._pa_array | ||
|
||
return data_to_reduce | ||
|
||
def _maybe_cast_reduction_result( | ||
self, result: pa.Scalar | pa.ChunkedArray, name: str | ||
): | ||
""" | ||
Maybe cast a reduction result to a different pyarrow type. | ||
|
||
See ArrowExtensionArray._values_for_reduction. | ||
|
||
Parameters | ||
---------- | ||
result : pa.Scalar | pa.ChunkedArray | ||
name : str | ||
Name of the function, supported values are: | ||
{ any, all, min, max, sum, mean, median, prod, | ||
std, var, sem, kurt, skew }. | ||
|
||
Returns | ||
------- | ||
pa.Scalar or pa.ChunkedArray | ||
""" | ||
pa_type = self._pa_array.type | ||
if name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): | ||
result = result.cast(pa_type) | ||
if name in ["median", "mean"] and pa.types.is_temporal(pa_type): | ||
if not pa_version_under13p0: | ||
nbits = pa_type.bit_width | ||
if nbits == 32: | ||
result = result.cast(pa.int32(), **cast_kwargs) | ||
result = pc.cast(result, pa.int32(), safe=False) | ||
else: | ||
result = result.cast(pa.int64(), **cast_kwargs) | ||
result = pc.cast(result, pa.int64(), safe=False) | ||
result = result.cast(pa_type) | ||
if name in ["std", "sem"] and pa.types.is_temporal(pa_type): | ||
result = result.cast(pa.int64(), **cast_kwargs) | ||
result = pc.cast(result, pa.int64(), safe=False) | ||
jbrockmendel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if pa.types.is_duration(pa_type): | ||
result = result.cast(pa_type) | ||
elif pa.types.is_time(pa_type): | ||
|
@@ -1999,27 +2044,110 @@ def _groupby_op( | |
**kwargs, | ||
) | ||
|
||
# maybe convert to a compatible dtype optimized for groupby | ||
values: ExtensionArray | ||
pa_type = self._pa_array.type | ||
if pa.types.is_timestamp(pa_type): | ||
values = self._to_datetimearray() | ||
elif pa.types.is_duration(pa_type): | ||
values = self._to_timedeltaarray() | ||
pa_supported_groupby_aggregations = [ | ||
"any", | ||
"all", | ||
"sum", | ||
"prod", | ||
"min", | ||
"max", | ||
"mean", | ||
"sem", | ||
"std", | ||
"var", | ||
] | ||
# aggregations not yet supported natively in pyarrow: | ||
# - first/last: https://github.com/apache/arrow/issues/36709 | ||
# - nth | ||
# - median | ||
|
||
if how not in pa_supported_groupby_aggregations: | ||
# maybe convert to a compatible dtype for groupby | ||
values: ExtensionArray | ||
pa_type = self._pa_array.type | ||
if pa.types.is_timestamp(pa_type): | ||
values = self._to_datetimearray() | ||
elif pa.types.is_duration(pa_type): | ||
values = self._to_timedeltaarray() | ||
else: | ||
values = self._to_masked() | ||
|
||
result = values._groupby_op( | ||
how=how, | ||
has_dropped_na=has_dropped_na, | ||
min_count=min_count, | ||
ngroups=ngroups, | ||
ids=ids, | ||
**kwargs, | ||
) | ||
if isinstance(result, np.ndarray): | ||
return result | ||
return type(self)._from_sequence(result, copy=False) | ||
|
||
arr = self._values_for_reduction(how) | ||
|
||
pa_name = { | ||
"prod": "product", | ||
"std": "stddev", | ||
"var": "variance", | ||
}.get(how, how) | ||
|
||
# pyarrow does not accept a min_count of -1 | ||
if min_count == -1: | ||
if how in ["any", "all"]: | ||
min_count = 0 | ||
else: | ||
min_count = 1 | ||
rhshadrach marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if how in ["std", "var"]: | ||
aggregations = [ | ||
( | ||
"val", | ||
pa_name, | ||
pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count), | ||
) | ||
] | ||
elif how == "sem": | ||
aggregations = [ | ||
( | ||
"val", | ||
"stddev", | ||
pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count), | ||
), | ||
( | ||
"val", | ||
"count", | ||
pc.CountOptions(mode="only_valid"), | ||
), | ||
] | ||
else: | ||
values = self._to_masked() | ||
|
||
result = values._groupby_op( | ||
how=how, | ||
has_dropped_na=has_dropped_na, | ||
min_count=min_count, | ||
ngroups=ngroups, | ||
ids=ids, | ||
**kwargs, | ||
aggregations = [ | ||
( | ||
"val", | ||
pa_name, | ||
pc.ScalarAggregateOptions(min_count=min_count), | ||
) | ||
] | ||
|
||
res = ( | ||
pa.Table.from_pydict({"id": ids, "val": arr}) | ||
.group_by("id") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC this is re-calculating the codes we already have in our GroupBy object. I see the appeal, but am wary of this pattern. I think the long-term solution is to get pyarrow to expose something to which we can pass the already-computed codes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, that's fair. Also, it would be nice to pass all aggregations at once to pyarrow - see the last comment in the OP. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Any idea what is driving the perf impact? Does the pyarrow version do something in parallel? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't tell from reading the docs whether anything is done in parallel. It could be simply (as you've pointed out) that pyarrow is only computing the group codes once when done in a single batch and recomputing them when done iteratively. |
||
.aggregate(aggregations) | ||
) | ||
if isinstance(result, np.ndarray): | ||
return result | ||
return type(self)._from_sequence(result, copy=False) | ||
|
||
if how == "sem": | ||
numerator = res["val_stddev"] | ||
if pa_version_under8p0: | ||
denominator = pc.power_checked(res["val_count"], 0.5) | ||
else: | ||
denominator = pc.sqrt_checked(res["val_count"]) | ||
result = pc.divide_checked(numerator, denominator) | ||
else: | ||
result = res[f"val_{pa_name}"] | ||
|
||
result = self._maybe_cast_reduction_result(result, how) | ||
return type(self)(result) | ||
|
||
def _apply_elementwise(self, func: Callable) -> list[list[Any]]: | ||
"""Apply a callable to each element while maintaining the chunking structure.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC at least some of the bugfix is coming from adding this casting to _groupby_op. Could split off from this PR one that implements _values_for_reduction and _maybe_cast_reduction_result?