From 4fcb3c6e3753b1a96e162eb2b865e096edf4764e Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Thu, 21 Sep 2023 20:23:44 -0400 Subject: [PATCH 1/4] use pyarrow grouped aggregation functions --- asv_bench/benchmarks/groupby.py | 64 +++++++ doc/source/whatsnew/v2.2.0.rst | 2 +- pandas/core/arrays/arrow/array.py | 241 ++++++++++++++++++++------- pandas/core/arrays/base.py | 2 +- pandas/tests/extension/test_arrow.py | 74 ++++++-- 5 files changed, 312 insertions(+), 71 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index b206523dfe851..1aed6e1dbf850 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -3,9 +3,11 @@ from string import ascii_letters import numpy as np +import pyarrow as pa from pandas import ( NA, + ArrowDtype, Categorical, DataFrame, Index, @@ -1081,4 +1083,66 @@ def time_resample_multiindex(self): ).mean() +class GroupByAggregateArrowDtypes: + param_names = ["dtype", "method"] + params = [ + [ + ArrowDtype(pa.bool_()), + ArrowDtype(pa.decimal128(25, 3)), + ArrowDtype(pa.float64()), + ArrowDtype(pa.int32()), + ArrowDtype(pa.string()), + ArrowDtype(pa.timestamp("s", "UTC")), + ArrowDtype(pa.duration("ms")), + ], + [ + "any", + "all", + "count", + "sum", + "prod", + "min", + "max", + "mean", + "median", + "std", + ], + ] + + def setup(self, dtype, method): + size = (200_000, 10) + pa_type = dtype.pyarrow_dtype + if pa.types.is_floating(pa_type): + values = np.random.randn(*size) + elif pa.types.is_integer(pa_type): + values = np.random.randint(0, 10_000, size) + elif pa.types.is_decimal(pa_type): + values = np.random.randn(*size) + elif pa.types.is_boolean(pa_type): + values = np.random.randint(0, 2, size, dtype=np.bool_) + elif pa.types.is_timestamp(pa_type): + if method in ["any", "all", "sum", "prod"]: + raise NotImplementedError + values = np.random.randint(0, 10_000, size) + elif pa.types.is_duration(pa_type): + if method == "prod": + raise NotImplementedError + values = np.random.randint(0, 10_000, size) + elif pa.types.is_string(pa_type): + if method in ["any", "all", "sum", "prod", "mean", "median", "std"]: + raise NotImplementedError + values = tm.rands_array(nchars=10, size=size) + else: + raise NotImplementedError + + columns = list("abcdefghij") + df = DataFrame(values, columns=columns, dtype=dtype) + df.iloc[::10, ::2] = NA + df["key"] = np.random.randint(0, high=100, size=(len(values))) + self.df = df + + def time_frame_agg(self, dtype, method): + self.df.groupby("key").agg(method) + + from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/doc/source/whatsnew/v2.2.0.rst b/doc/source/whatsnew/v2.2.0.rst index 8eab623a2b5f7..acff2059f1327 100644 --- a/doc/source/whatsnew/v2.2.0.rst +++ b/doc/source/whatsnew/v2.2.0.rst @@ -326,7 +326,7 @@ Plotting Groupby/resample/rolling ^^^^^^^^^^^^^^^^^^^^^^^^ -- +- Bug in :meth:`.SeriesGroupBy.var` and :meth:`.DataFrameGroupBy.var` where the dtype would be ``np.float64`` for data with :class:`ArrowDtype` with ``pyarrow.decimal128`` type (:issue:`54627`) - Reshaping diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index 4b79d0dbb683e..8566beefbd1ed 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -1466,44 +1466,16 @@ def _reduce_pyarrow(self, name: str, *, skipna: bool = True, **kwargs) -> pa.Sca ------ TypeError : subclass does not define reductions """ - pa_type = self._pa_array.type - - data_to_reduce = self._pa_array - - cast_kwargs = {} if pa_version_under13p0 else {"safe": False} - - if name in ["any", "all"] and ( - pa.types.is_integer(pa_type) - or pa.types.is_floating(pa_type) - or pa.types.is_duration(pa_type) - or pa.types.is_decimal(pa_type) - ): - # pyarrow only supports any/all for boolean dtype, we allow - # for other dtypes, matching our non-pyarrow behavior - - if pa.types.is_duration(pa_type): - data_to_cmp = self._pa_array.cast(pa.int64()) - else: - data_to_cmp = self._pa_array - - not_eq = pc.not_equal(data_to_cmp, 0) - data_to_reduce = not_eq - - elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): - data_to_reduce = self._pa_array.cast(pa.int64()) - - elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type): - nbits = pa_type.bit_width - if nbits == 32: - data_to_reduce = self._pa_array.cast(pa.int32()) - else: - data_to_reduce = self._pa_array.cast(pa.int64()) + data_to_reduce = self._get_values_for_reduction(name) if name == "sem": def pyarrow_meth(data, skip_nulls, **kwargs): numerator = pc.stddev(data, skip_nulls=skip_nulls, **kwargs) - denominator = pc.sqrt_checked(pc.count(self._pa_array)) + if pa_version_under8p0: + denominator = pc.power_checked(pc.count(self._pa_array), 0.5) + else: + denominator = pc.sqrt_checked(pc.count(self._pa_array)) return pc.divide_checked(numerator, denominator) else: @@ -1541,21 +1513,95 @@ def pyarrow_meth(data, skip_nulls, **kwargs): if name == "median": # GH 52679: Use quantile instead of approximate_median; returns array result = result[0] - if pc.is_null(result).as_py(): - return result + return self._maybe_cast_reduction_result(result, name) + + def _get_values_for_reduction(self, name: str) -> pa.ChunkedArray: + """ + Return the underlying ChunkedArray, possibly cast to a different type in + order to support reductions. + + Parameters + ---------- + name : str + Name of the function, supported values are: + { any, all, min, max, sum, mean, median, prod, + std, var, sem, kurt, skew }. + + Returns + ------- + pa.ChunkedArray + """ + pa_type = self._pa_array.type + + if name in ["any", "all"] and ( + pa.types.is_integer(pa_type) + or pa.types.is_floating(pa_type) + or pa.types.is_duration(pa_type) + or pa.types.is_decimal(pa_type) + ): + # pyarrow only supports any/all for boolean dtype, we allow + # for other dtypes, matching our non-pyarrow behavior + if pa.types.is_duration(pa_type): + data_to_cmp = self._pa_array.cast(pa.int64()) + else: + data_to_cmp = self._pa_array + + not_eq = pc.not_equal(data_to_cmp, 0) + data_to_reduce = not_eq + + elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): + data_to_reduce = self._pa_array.cast(pa.int64()) + + elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type): + nbits = pa_type.bit_width + if nbits == 32: + data_to_reduce = self._pa_array.cast(pa.int32()) + else: + data_to_reduce = self._pa_array.cast(pa.int64()) + + elif name in ["sem", "std", "var"] and pa.types.is_boolean(pa_type): + data_to_reduce = self._pa_array.cast(pa.uint8()) + + else: + data_to_reduce = self._pa_array + + return data_to_reduce + + def _maybe_cast_reduction_result( + self, result: pa.Scalar | pa.ChunkedArray, name: str + ): + """ + Maybe cast a reduction result to a different pyarrow type. + + See ArrowExtensionArray._get_values_for_reduction. + + Parameters + ---------- + result : pa.Scalar | pa.ChunkedArray + name : str + Name of the function, supported values are: + { any, all, min, max, sum, mean, median, prod, + std, var, sem, kurt, skew }. + + Returns + ------- + pa.Scalar or pa.ChunkedArray + """ + pa_type = self._pa_array.type + cast_kwargs = {"safe": False} if name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): result = result.cast(pa_type) if name in ["median", "mean"] and pa.types.is_temporal(pa_type): if not pa_version_under13p0: nbits = pa_type.bit_width if nbits == 32: - result = result.cast(pa.int32(), **cast_kwargs) + result = pc.cast(result, pa.int32(), **cast_kwargs) else: - result = result.cast(pa.int64(), **cast_kwargs) + result = pc.cast(result, pa.int64(), **cast_kwargs) result = result.cast(pa_type) if name in ["std", "sem"] and pa.types.is_temporal(pa_type): - result = result.cast(pa.int64(), **cast_kwargs) + result = pc.cast(result, pa.int64(), **cast_kwargs) if pa.types.is_duration(pa_type): result = result.cast(pa_type) elif pa.types.is_time(pa_type): @@ -1999,27 +2045,110 @@ def _groupby_op( **kwargs, ) - # maybe convert to a compatible dtype optimized for groupby - values: ExtensionArray pa_type = self._pa_array.type - if pa.types.is_timestamp(pa_type): - values = self._to_datetimearray() - elif pa.types.is_duration(pa_type): - values = self._to_timedeltaarray() + pa_supported_groupby_aggregations = [ + "any", + "all", + "sum", + "prod", + "min", + "max", + "mean", + "sem", + "std", + "var", + ] + # aggregations not yet supported natively in pyarrow: + # - first/last: https://github.com/apache/arrow/issues/36709 + # - nth + # - median + + if how not in pa_supported_groupby_aggregations: + # maybe convert to a compatible dtype for groupby + values: ExtensionArray + pa_type = self._pa_array.type + if pa.types.is_timestamp(pa_type): + values = self._to_datetimearray() + elif pa.types.is_duration(pa_type): + values = self._to_timedeltaarray() + else: + values = self._to_masked() + + result = values._groupby_op( + how=how, + has_dropped_na=has_dropped_na, + min_count=min_count, + ngroups=ngroups, + ids=ids, + **kwargs, + ) + if isinstance(result, np.ndarray): + return result + return type(self)._from_sequence(result, copy=False) + + arr = self._get_values_for_reduction(how) + + pa_name = { + "prod": "product", + "std": "stddev", + "var": "variance", + }.get(how, how) + + # pyarrow does not accept a min_count of -1 + if min_count == -1: + if how in ["any", "all"]: + min_count = 0 + else: + min_count = 1 + + if how in ["std", "var"]: + aggregations = [ + ( + "val", + pa_name, + pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count), + ) + ] + elif how == "sem": + aggregations = [ + ( + "val", + "stddev", + pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count), + ), + ( + "val", + "count", + pc.CountOptions(mode="only_valid"), + ), + ] else: - values = self._to_masked() - - result = values._groupby_op( - how=how, - has_dropped_na=has_dropped_na, - min_count=min_count, - ngroups=ngroups, - ids=ids, - **kwargs, + aggregations = [ + ( + "val", + pa_name, + pc.ScalarAggregateOptions(min_count=min_count), + ) + ] + + res = ( + pa.Table.from_pydict({"id": ids, "val": arr}) + .group_by("id") + .aggregate(aggregations) ) - if isinstance(result, np.ndarray): - return result - return type(self)._from_sequence(result, copy=False) + + if how == "sem": + numerator = res["val_stddev"] + if pa_version_under8p0: + denominator = pc.power_checked(res["val_count"], 0.5) + else: + denominator = pc.sqrt_checked(res["val_count"]) + result = pc.divide_checked(numerator, denominator) + else: + result = res[f"val_{pa_name}"] + + result = self._maybe_cast_reduction_result(result, how) + return type(self)(result) def _apply_elementwise(self, func: Callable) -> list[list[Any]]: """Apply a callable to each element while maintaining the chunking structure.""" diff --git a/pandas/core/arrays/base.py b/pandas/core/arrays/base.py index 933944dbd4632..a01b0ed420664 100644 --- a/pandas/core/arrays/base.py +++ b/pandas/core/arrays/base.py @@ -2248,7 +2248,7 @@ def _groupby_op( Parameters ---------- how : {'any', 'all', 'sum', 'prod', 'min', 'max', 'mean', 'median', - 'median', 'var', 'std', 'sem', 'nth', 'last', 'ohlc', + 'var', 'std', 'sem', 'nth', 'first', 'last', 'ohlc', 'cumprod', 'cumsum', 'cummin', 'cummax', 'rank'} has_dropped_na : bool min_count : int diff --git a/pandas/tests/extension/test_arrow.py b/pandas/tests/extension/test_arrow.py index 339e97e735f85..ec4c2a1fc399d 100644 --- a/pandas/tests/extension/test_arrow.py +++ b/pandas/tests/extension/test_arrow.py @@ -503,19 +503,7 @@ def test_reduce_series_numeric(self, data, all_numeric_reductions, skipna, reque and pa.types.is_decimal(pa_dtype) ): request.node.add_marker(xfail_mark) - elif ( - all_numeric_reductions == "sem" - and pa_version_under8p0 - and (dtype._is_numeric or pa.types.is_temporal(pa_dtype)) - ): - request.node.add_marker(xfail_mark) - - elif pa.types.is_boolean(pa_dtype) and all_numeric_reductions in { - "sem", - "std", - "var", - "median", - }: + elif all_numeric_reductions == "median" and pa.types.is_boolean(pa_dtype): request.node.add_marker(xfail_mark) super().test_reduce_series_numeric(data, all_numeric_reductions, skipna) @@ -3046,3 +3034,63 @@ def test_factorize_chunked_dictionary(): exp_uniques = pd.Index(ArrowExtensionArray(pa_array.combine_chunks())) tm.assert_numpy_array_equal(res_indices, exp_indicies) tm.assert_index_equal(res_uniques, exp_uniques) + + +@pytest.mark.parametrize( + "how", ["any", "all", "sum", "prod", "min", "max", "mean", "sem", "std", "var"] +) +def test_groupby_reductions(data, how, request): + # GH ##### + mark_pyarrow = pytest.mark.xfail( + raises=pa.ArrowNotImplementedError, + reason="no kernel matching input types", + ) + mark_fallback = pytest.mark.xfail( + raises=TypeError, + reason="agg function failed", + ) + pa_type = data._pa_array.type + if pa.types.is_string(pa_type) or pa.types.is_binary(pa_type): + if how in ["any", "all", "sem", "std"]: + request.node.add_marker(mark_pyarrow) + elif how in ["sum", "prod", "mean", "var"]: + request.node.add_marker(mark_fallback) + elif pa.types.is_duration(pa_type): + if how in ["prod", "var"]: + request.node.add_marker(mark_fallback) + elif pa.types.is_temporal(pa_type): + if how in ["any", "all"]: + request.node.add_marker(mark_pyarrow) + elif how in ["sum", "prod", "var"]: + request.node.add_marker(mark_fallback) + + null_index = 8 + assert pd.notnull(data[0]) + assert pd.notnull(data[1]) + assert pd.isnull(data[null_index]) + + group_locs = [ + [0, 1, 1], + [0, 1, null_index], + [null_index, 0], + [null_index], + ] + group_lengths = [len(group) for group in group_locs] + groups = np.array(["B", "C", "A", "D"]).repeat(group_lengths) + values = data.take([loc for arr in group_locs for loc in arr]) + df = pd.DataFrame({"key": groups, "val": values}) + + result = df.groupby("key", sort=False)["val"].aggregate(how) + + expected_type = data.take(group_locs[0])._reduce_pyarrow(how).type + expected_arr = pa.array( + [pd.Series(data.take(locs)).agg(how) for locs in group_locs], + type=expected_type, + from_pandas=True, + ) + expected = pd.Series( + ArrowExtensionArray(expected_arr), + index=pd.Index(["B", "C", "A", "D"], name="key"), + name="val", + ) + tm.assert_series_equal(result, expected) From e40526b5fd9667ed9a16651814e81af14574d495 Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Thu, 21 Sep 2023 21:04:54 -0400 Subject: [PATCH 2/4] gh ref --- pandas/tests/extension/test_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/extension/test_arrow.py b/pandas/tests/extension/test_arrow.py index ec4c2a1fc399d..e84f7f961512b 100644 --- a/pandas/tests/extension/test_arrow.py +++ b/pandas/tests/extension/test_arrow.py @@ -3040,7 +3040,7 @@ def test_factorize_chunked_dictionary(): "how", ["any", "all", "sum", "prod", "min", "max", "mean", "sem", "std", "var"] ) def test_groupby_reductions(data, how, request): - # GH ##### + # GH 55234 mark_pyarrow = pytest.mark.xfail( raises=pa.ArrowNotImplementedError, reason="no kernel matching input types", From 13cbd100dc397d1997052092ab5fbe825f840ee5 Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Thu, 21 Sep 2023 23:16:03 -0400 Subject: [PATCH 3/4] feedback --- pandas/core/arrays/arrow/array.py | 7 +++---- pandas/tests/extension/test_arrow.py | 31 +++++++++++++++------------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index 8566beefbd1ed..e5c487b892ae2 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -1589,19 +1589,18 @@ def _maybe_cast_reduction_result( pa.Scalar or pa.ChunkedArray """ pa_type = self._pa_array.type - cast_kwargs = {"safe": False} if name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): result = result.cast(pa_type) if name in ["median", "mean"] and pa.types.is_temporal(pa_type): if not pa_version_under13p0: nbits = pa_type.bit_width if nbits == 32: - result = pc.cast(result, pa.int32(), **cast_kwargs) + result = pc.cast(result, pa.int32(), safe=False) else: - result = pc.cast(result, pa.int64(), **cast_kwargs) + result = pc.cast(result, pa.int64(), safe=False) result = result.cast(pa_type) if name in ["std", "sem"] and pa.types.is_temporal(pa_type): - result = pc.cast(result, pa.int64(), **cast_kwargs) + result = pc.cast(result, pa.int64(), safe=False) if pa.types.is_duration(pa_type): result = result.cast(pa_type) elif pa.types.is_time(pa_type): diff --git a/pandas/tests/extension/test_arrow.py b/pandas/tests/extension/test_arrow.py index e84f7f961512b..0e18728c19df7 100644 --- a/pandas/tests/extension/test_arrow.py +++ b/pandas/tests/extension/test_arrow.py @@ -3041,28 +3041,25 @@ def test_factorize_chunked_dictionary(): ) def test_groupby_reductions(data, how, request): # GH 55234 - mark_pyarrow = pytest.mark.xfail( - raises=pa.ArrowNotImplementedError, - reason="no kernel matching input types", - ) - mark_fallback = pytest.mark.xfail( - raises=TypeError, - reason="agg function failed", - ) pa_type = data._pa_array.type + + pyarrow_err_msg = (pa.ArrowNotImplementedError, "no kernel matching input types") + fallback_err_msg = (TypeError, "agg function failed") + err, msg = None, None + if pa.types.is_string(pa_type) or pa.types.is_binary(pa_type): if how in ["any", "all", "sem", "std"]: - request.node.add_marker(mark_pyarrow) + err, msg = pyarrow_err_msg elif how in ["sum", "prod", "mean", "var"]: - request.node.add_marker(mark_fallback) + err, msg = fallback_err_msg elif pa.types.is_duration(pa_type): if how in ["prod", "var"]: - request.node.add_marker(mark_fallback) + err, msg = fallback_err_msg elif pa.types.is_temporal(pa_type): if how in ["any", "all"]: - request.node.add_marker(mark_pyarrow) + err, msg = pyarrow_err_msg elif how in ["sum", "prod", "var"]: - request.node.add_marker(mark_fallback) + err, msg = fallback_err_msg null_index = 8 assert pd.notnull(data[0]) @@ -3079,8 +3076,14 @@ def test_groupby_reductions(data, how, request): groups = np.array(["B", "C", "A", "D"]).repeat(group_lengths) values = data.take([loc for arr in group_locs for loc in arr]) df = pd.DataFrame({"key": groups, "val": values}) + gby = df.groupby("key", sort=False)["val"] - result = df.groupby("key", sort=False)["val"].aggregate(how) + if err is not None: + with pytest.raises(err, match=msg): + result = gby.aggregate(how) + return + else: + result = gby.aggregate(how) expected_type = data.take(group_locs[0])._reduce_pyarrow(how).type expected_arr = pa.array( From 87581d4f78fee6101aed0dd5c16dad3febc7ec97 Mon Sep 17 00:00:00 2001 From: Luke Manley Date: Fri, 22 Sep 2023 22:16:29 -0400 Subject: [PATCH 4/4] update name --- pandas/core/arrays/arrow/array.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index e5c487b892ae2..785215ed95591 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -1466,7 +1466,7 @@ def _reduce_pyarrow(self, name: str, *, skipna: bool = True, **kwargs) -> pa.Sca ------ TypeError : subclass does not define reductions """ - data_to_reduce = self._get_values_for_reduction(name) + data_to_reduce = self._values_for_reduction(name) if name == "sem": @@ -1516,7 +1516,7 @@ def pyarrow_meth(data, skip_nulls, **kwargs): return self._maybe_cast_reduction_result(result, name) - def _get_values_for_reduction(self, name: str) -> pa.ChunkedArray: + def _values_for_reduction(self, name: str) -> pa.ChunkedArray: """ Return the underlying ChunkedArray, possibly cast to a different type in order to support reductions. @@ -1574,7 +1574,7 @@ def _maybe_cast_reduction_result( """ Maybe cast a reduction result to a different pyarrow type. - See ArrowExtensionArray._get_values_for_reduction. + See ArrowExtensionArray._values_for_reduction. Parameters ---------- @@ -2085,7 +2085,7 @@ def _groupby_op( return result return type(self)._from_sequence(result, copy=False) - arr = self._get_values_for_reduction(how) + arr = self._values_for_reduction(how) pa_name = { "prod": "product",