diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 4567b5b414301..3743ec4b935cc 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -3,9 +3,11 @@ from string import ascii_letters import numpy as np +import pyarrow as pa from pandas import ( NA, + ArrowDtype, Categorical, DataFrame, Index, @@ -1098,4 +1100,66 @@ def time_resample_multiindex(self): ).mean() +class GroupByAggregateArrowDtypes: + param_names = ["dtype", "method"] + params = [ + [ + ArrowDtype(pa.bool_()), + ArrowDtype(pa.decimal128(25, 3)), + ArrowDtype(pa.float64()), + ArrowDtype(pa.int32()), + ArrowDtype(pa.string()), + ArrowDtype(pa.timestamp("s", "UTC")), + ArrowDtype(pa.duration("ms")), + ], + [ + "any", + "all", + "count", + "sum", + "prod", + "min", + "max", + "mean", + "median", + "std", + ], + ] + + def setup(self, dtype, method): + size = (200_000, 10) + pa_type = dtype.pyarrow_dtype + if pa.types.is_floating(pa_type): + values = np.random.randn(*size) + elif pa.types.is_integer(pa_type): + values = np.random.randint(0, 10_000, size) + elif pa.types.is_decimal(pa_type): + values = np.random.randn(*size) + elif pa.types.is_boolean(pa_type): + values = np.random.randint(0, 2, size, dtype=np.bool_) + elif pa.types.is_timestamp(pa_type): + if method in ["any", "all", "sum", "prod"]: + raise NotImplementedError + values = np.random.randint(0, 10_000, size) + elif pa.types.is_duration(pa_type): + if method == "prod": + raise NotImplementedError + values = np.random.randint(0, 10_000, size) + elif pa.types.is_string(pa_type): + if method in ["any", "all", "sum", "prod", "mean", "median", "std"]: + raise NotImplementedError + values = tm.rands_array(nchars=10, size=size) + else: + raise NotImplementedError + + columns = list("abcdefghij") + df = DataFrame(values, columns=columns, dtype=dtype) + df.iloc[::10, ::2] = NA + df["key"] = np.random.randint(0, high=100, size=(len(values))) + self.df = df + + def time_frame_agg(self, dtype, method): + self.df.groupby("key").agg(method) + + from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/doc/source/whatsnew/v2.2.0.rst b/doc/source/whatsnew/v2.2.0.rst index 0760840f9950a..9a00b18af3126 100644 --- a/doc/source/whatsnew/v2.2.0.rst +++ b/doc/source/whatsnew/v2.2.0.rst @@ -329,7 +329,7 @@ Plotting Groupby/resample/rolling ^^^^^^^^^^^^^^^^^^^^^^^^ -- +- Bug in :meth:`.SeriesGroupBy.var` and :meth:`.DataFrameGroupBy.var` where the dtype would be ``np.float64`` for data with :class:`ArrowDtype` with ``pyarrow.decimal128`` type (:issue:`54627`) - Reshaping diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index 4b79d0dbb683e..785215ed95591 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -1466,44 +1466,16 @@ def _reduce_pyarrow(self, name: str, *, skipna: bool = True, **kwargs) -> pa.Sca ------ TypeError : subclass does not define reductions """ - pa_type = self._pa_array.type - - data_to_reduce = self._pa_array - - cast_kwargs = {} if pa_version_under13p0 else {"safe": False} - - if name in ["any", "all"] and ( - pa.types.is_integer(pa_type) - or pa.types.is_floating(pa_type) - or pa.types.is_duration(pa_type) - or pa.types.is_decimal(pa_type) - ): - # pyarrow only supports any/all for boolean dtype, we allow - # for other dtypes, matching our non-pyarrow behavior - - if pa.types.is_duration(pa_type): - data_to_cmp = self._pa_array.cast(pa.int64()) - else: - data_to_cmp = self._pa_array - - not_eq = pc.not_equal(data_to_cmp, 0) - data_to_reduce = not_eq - - elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): - data_to_reduce = self._pa_array.cast(pa.int64()) - - elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type): - nbits = pa_type.bit_width - if nbits == 32: - data_to_reduce = self._pa_array.cast(pa.int32()) - else: - data_to_reduce = self._pa_array.cast(pa.int64()) + data_to_reduce = self._values_for_reduction(name) if name == "sem": def pyarrow_meth(data, skip_nulls, **kwargs): numerator = pc.stddev(data, skip_nulls=skip_nulls, **kwargs) - denominator = pc.sqrt_checked(pc.count(self._pa_array)) + if pa_version_under8p0: + denominator = pc.power_checked(pc.count(self._pa_array), 0.5) + else: + denominator = pc.sqrt_checked(pc.count(self._pa_array)) return pc.divide_checked(numerator, denominator) else: @@ -1541,21 +1513,94 @@ def pyarrow_meth(data, skip_nulls, **kwargs): if name == "median": # GH 52679: Use quantile instead of approximate_median; returns array result = result[0] - if pc.is_null(result).as_py(): - return result + return self._maybe_cast_reduction_result(result, name) + + def _values_for_reduction(self, name: str) -> pa.ChunkedArray: + """ + Return the underlying ChunkedArray, possibly cast to a different type in + order to support reductions. + + Parameters + ---------- + name : str + Name of the function, supported values are: + { any, all, min, max, sum, mean, median, prod, + std, var, sem, kurt, skew }. + + Returns + ------- + pa.ChunkedArray + """ + pa_type = self._pa_array.type + + if name in ["any", "all"] and ( + pa.types.is_integer(pa_type) + or pa.types.is_floating(pa_type) + or pa.types.is_duration(pa_type) + or pa.types.is_decimal(pa_type) + ): + # pyarrow only supports any/all for boolean dtype, we allow + # for other dtypes, matching our non-pyarrow behavior + if pa.types.is_duration(pa_type): + data_to_cmp = self._pa_array.cast(pa.int64()) + else: + data_to_cmp = self._pa_array + + not_eq = pc.not_equal(data_to_cmp, 0) + data_to_reduce = not_eq + + elif name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): + data_to_reduce = self._pa_array.cast(pa.int64()) + + elif name in ["median", "mean", "std", "sem"] and pa.types.is_temporal(pa_type): + nbits = pa_type.bit_width + if nbits == 32: + data_to_reduce = self._pa_array.cast(pa.int32()) + else: + data_to_reduce = self._pa_array.cast(pa.int64()) + + elif name in ["sem", "std", "var"] and pa.types.is_boolean(pa_type): + data_to_reduce = self._pa_array.cast(pa.uint8()) + + else: + data_to_reduce = self._pa_array + + return data_to_reduce + + def _maybe_cast_reduction_result( + self, result: pa.Scalar | pa.ChunkedArray, name: str + ): + """ + Maybe cast a reduction result to a different pyarrow type. + + See ArrowExtensionArray._values_for_reduction. + + Parameters + ---------- + result : pa.Scalar | pa.ChunkedArray + name : str + Name of the function, supported values are: + { any, all, min, max, sum, mean, median, prod, + std, var, sem, kurt, skew }. + + Returns + ------- + pa.Scalar or pa.ChunkedArray + """ + pa_type = self._pa_array.type if name in ["min", "max", "sum"] and pa.types.is_duration(pa_type): result = result.cast(pa_type) if name in ["median", "mean"] and pa.types.is_temporal(pa_type): if not pa_version_under13p0: nbits = pa_type.bit_width if nbits == 32: - result = result.cast(pa.int32(), **cast_kwargs) + result = pc.cast(result, pa.int32(), safe=False) else: - result = result.cast(pa.int64(), **cast_kwargs) + result = pc.cast(result, pa.int64(), safe=False) result = result.cast(pa_type) if name in ["std", "sem"] and pa.types.is_temporal(pa_type): - result = result.cast(pa.int64(), **cast_kwargs) + result = pc.cast(result, pa.int64(), safe=False) if pa.types.is_duration(pa_type): result = result.cast(pa_type) elif pa.types.is_time(pa_type): @@ -1999,27 +2044,110 @@ def _groupby_op( **kwargs, ) - # maybe convert to a compatible dtype optimized for groupby - values: ExtensionArray pa_type = self._pa_array.type - if pa.types.is_timestamp(pa_type): - values = self._to_datetimearray() - elif pa.types.is_duration(pa_type): - values = self._to_timedeltaarray() + pa_supported_groupby_aggregations = [ + "any", + "all", + "sum", + "prod", + "min", + "max", + "mean", + "sem", + "std", + "var", + ] + # aggregations not yet supported natively in pyarrow: + # - first/last: https://github.com/apache/arrow/issues/36709 + # - nth + # - median + + if how not in pa_supported_groupby_aggregations: + # maybe convert to a compatible dtype for groupby + values: ExtensionArray + pa_type = self._pa_array.type + if pa.types.is_timestamp(pa_type): + values = self._to_datetimearray() + elif pa.types.is_duration(pa_type): + values = self._to_timedeltaarray() + else: + values = self._to_masked() + + result = values._groupby_op( + how=how, + has_dropped_na=has_dropped_na, + min_count=min_count, + ngroups=ngroups, + ids=ids, + **kwargs, + ) + if isinstance(result, np.ndarray): + return result + return type(self)._from_sequence(result, copy=False) + + arr = self._values_for_reduction(how) + + pa_name = { + "prod": "product", + "std": "stddev", + "var": "variance", + }.get(how, how) + + # pyarrow does not accept a min_count of -1 + if min_count == -1: + if how in ["any", "all"]: + min_count = 0 + else: + min_count = 1 + + if how in ["std", "var"]: + aggregations = [ + ( + "val", + pa_name, + pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count), + ) + ] + elif how == "sem": + aggregations = [ + ( + "val", + "stddev", + pc.VarianceOptions(ddof=kwargs["ddof"], min_count=min_count), + ), + ( + "val", + "count", + pc.CountOptions(mode="only_valid"), + ), + ] else: - values = self._to_masked() - - result = values._groupby_op( - how=how, - has_dropped_na=has_dropped_na, - min_count=min_count, - ngroups=ngroups, - ids=ids, - **kwargs, + aggregations = [ + ( + "val", + pa_name, + pc.ScalarAggregateOptions(min_count=min_count), + ) + ] + + res = ( + pa.Table.from_pydict({"id": ids, "val": arr}) + .group_by("id") + .aggregate(aggregations) ) - if isinstance(result, np.ndarray): - return result - return type(self)._from_sequence(result, copy=False) + + if how == "sem": + numerator = res["val_stddev"] + if pa_version_under8p0: + denominator = pc.power_checked(res["val_count"], 0.5) + else: + denominator = pc.sqrt_checked(res["val_count"]) + result = pc.divide_checked(numerator, denominator) + else: + result = res[f"val_{pa_name}"] + + result = self._maybe_cast_reduction_result(result, how) + return type(self)(result) def _apply_elementwise(self, func: Callable) -> list[list[Any]]: """Apply a callable to each element while maintaining the chunking structure.""" diff --git a/pandas/core/arrays/base.py b/pandas/core/arrays/base.py index 933944dbd4632..a01b0ed420664 100644 --- a/pandas/core/arrays/base.py +++ b/pandas/core/arrays/base.py @@ -2248,7 +2248,7 @@ def _groupby_op( Parameters ---------- how : {'any', 'all', 'sum', 'prod', 'min', 'max', 'mean', 'median', - 'median', 'var', 'std', 'sem', 'nth', 'last', 'ohlc', + 'var', 'std', 'sem', 'nth', 'first', 'last', 'ohlc', 'cumprod', 'cumsum', 'cummin', 'cummax', 'rank'} has_dropped_na : bool min_count : int diff --git a/pandas/tests/extension/test_arrow.py b/pandas/tests/extension/test_arrow.py index 339e97e735f85..0e18728c19df7 100644 --- a/pandas/tests/extension/test_arrow.py +++ b/pandas/tests/extension/test_arrow.py @@ -503,19 +503,7 @@ def test_reduce_series_numeric(self, data, all_numeric_reductions, skipna, reque and pa.types.is_decimal(pa_dtype) ): request.node.add_marker(xfail_mark) - elif ( - all_numeric_reductions == "sem" - and pa_version_under8p0 - and (dtype._is_numeric or pa.types.is_temporal(pa_dtype)) - ): - request.node.add_marker(xfail_mark) - - elif pa.types.is_boolean(pa_dtype) and all_numeric_reductions in { - "sem", - "std", - "var", - "median", - }: + elif all_numeric_reductions == "median" and pa.types.is_boolean(pa_dtype): request.node.add_marker(xfail_mark) super().test_reduce_series_numeric(data, all_numeric_reductions, skipna) @@ -3046,3 +3034,66 @@ def test_factorize_chunked_dictionary(): exp_uniques = pd.Index(ArrowExtensionArray(pa_array.combine_chunks())) tm.assert_numpy_array_equal(res_indices, exp_indicies) tm.assert_index_equal(res_uniques, exp_uniques) + + +@pytest.mark.parametrize( + "how", ["any", "all", "sum", "prod", "min", "max", "mean", "sem", "std", "var"] +) +def test_groupby_reductions(data, how, request): + # GH 55234 + pa_type = data._pa_array.type + + pyarrow_err_msg = (pa.ArrowNotImplementedError, "no kernel matching input types") + fallback_err_msg = (TypeError, "agg function failed") + err, msg = None, None + + if pa.types.is_string(pa_type) or pa.types.is_binary(pa_type): + if how in ["any", "all", "sem", "std"]: + err, msg = pyarrow_err_msg + elif how in ["sum", "prod", "mean", "var"]: + err, msg = fallback_err_msg + elif pa.types.is_duration(pa_type): + if how in ["prod", "var"]: + err, msg = fallback_err_msg + elif pa.types.is_temporal(pa_type): + if how in ["any", "all"]: + err, msg = pyarrow_err_msg + elif how in ["sum", "prod", "var"]: + err, msg = fallback_err_msg + + null_index = 8 + assert pd.notnull(data[0]) + assert pd.notnull(data[1]) + assert pd.isnull(data[null_index]) + + group_locs = [ + [0, 1, 1], + [0, 1, null_index], + [null_index, 0], + [null_index], + ] + group_lengths = [len(group) for group in group_locs] + groups = np.array(["B", "C", "A", "D"]).repeat(group_lengths) + values = data.take([loc for arr in group_locs for loc in arr]) + df = pd.DataFrame({"key": groups, "val": values}) + gby = df.groupby("key", sort=False)["val"] + + if err is not None: + with pytest.raises(err, match=msg): + result = gby.aggregate(how) + return + else: + result = gby.aggregate(how) + + expected_type = data.take(group_locs[0])._reduce_pyarrow(how).type + expected_arr = pa.array( + [pd.Series(data.take(locs)).agg(how) for locs in group_locs], + type=expected_type, + from_pandas=True, + ) + expected = pd.Series( + ArrowExtensionArray(expected_arr), + index=pd.Index(["B", "C", "A", "D"], name="key"), + name="val", + ) + tm.assert_series_equal(result, expected)