Skip to content

Commit

Permalink
ENH: Allow numba aggregations to return non-float64 results (#53444)
Browse files Browse the repository at this point in the history
* ENH: non float64 result support in numba groupby

* refactor & simplify

* fix CI

* maybe green?

* skip unsupported ops in other bench as well

* updates from code review

* remove commented code

* update whatsnew

* debug benchmarks

* Skip min/max benchmarks
  • Loading branch information
lithomas1 authored Jun 15, 2023
1 parent 905fe6b commit 870a504
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 88 deletions.
97 changes: 88 additions & 9 deletions asv_bench/benchmarks/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,38 @@
},
}

# These aggregations don't have a kernel implemented for them yet
_numba_unsupported_methods = [
"all",
"any",
"bfill",
"count",
"cumcount",
"cummax",
"cummin",
"cumprod",
"cumsum",
"describe",
"diff",
"ffill",
"first",
"head",
"last",
"median",
"nunique",
"pct_change",
"prod",
"quantile",
"rank",
"sem",
"shift",
"size",
"skew",
"tail",
"unique",
"value_counts",
]


class ApplyDictReturn:
def setup(self):
Expand Down Expand Up @@ -453,9 +485,10 @@ class GroupByMethods:
],
["direct", "transformation"],
[1, 5],
["cython", "numba"],
]

def setup(self, dtype, method, application, ncols):
def setup(self, dtype, method, application, ncols, engine):
if method in method_blocklist.get(dtype, {}):
raise NotImplementedError # skip benchmark

Expand All @@ -474,6 +507,19 @@ def setup(self, dtype, method, application, ncols):
# DataFrameGroupBy doesn't have these methods
raise NotImplementedError

# Numba currently doesn't support
# multiple transform functions or strs for transform,
# grouping on multiple columns
# and we lack kernels for a bunch of methods
if (
engine == "numba"
and method in _numba_unsupported_methods
or ncols > 1
or application == "transformation"
or dtype == "datetime"
):
raise NotImplementedError

if method == "describe":
ngroups = 20
elif method == "skew":
Expand Down Expand Up @@ -505,17 +551,30 @@ def setup(self, dtype, method, application, ncols):
if len(cols) == 1:
cols = cols[0]

# Not everything supports the engine keyword yet
kwargs = {}
if engine == "numba":
kwargs["engine"] = engine

if application == "transformation":
self.as_group_method = lambda: df.groupby("key")[cols].transform(method)
self.as_field_method = lambda: df.groupby(cols)["key"].transform(method)
self.as_group_method = lambda: df.groupby("key")[cols].transform(
method, **kwargs
)
self.as_field_method = lambda: df.groupby(cols)["key"].transform(
method, **kwargs
)
else:
self.as_group_method = getattr(df.groupby("key")[cols], method)
self.as_field_method = getattr(df.groupby(cols)["key"], method)
self.as_group_method = partial(
getattr(df.groupby("key")[cols], method), **kwargs
)
self.as_field_method = partial(
getattr(df.groupby(cols)["key"], method), **kwargs
)

def time_dtype_as_group(self, dtype, method, application, ncols):
def time_dtype_as_group(self, dtype, method, application, ncols, engine):
self.as_group_method()

def time_dtype_as_field(self, dtype, method, application, ncols):
def time_dtype_as_field(self, dtype, method, application, ncols, engine):
self.as_field_method()


Expand All @@ -532,8 +591,12 @@ class GroupByCythonAgg:
[
"sum",
"prod",
"min",
"max",
# TODO: uncomment min/max
# Currently, min/max implemented very inefficiently
# because it re-uses the Window min/max kernel
# so it will time out ASVs
# "min",
# "max",

This comment has been minimized.

Copy link
@rhshadrach

rhshadrach Jul 23, 2023

Member

@lithomas1 - not sure I understand what this means; perhaps it was meant for another benchmark? For the Cython min/max implementation in groupby, I don't believe we're reusing the Window kernels, and I'm seeing better performance for min/max than e.g. sum.

This comment has been minimized.

Copy link
@lithomas1

lithomas1 Jul 23, 2023

Author Member

Yeah, I messed up here by commenting out the stuff for Cython aggregations too, I meant to only disable the benchmarks for numba.
(I think the correct approach is to override params in GroupByNumbaAgg).

The numba perf. issues are addressed in #53731, but that has been waiting on a review for a while.
I'll try to put up a PR to fix tmrw, if you don't beat me to it.

This comment has been minimized.

Copy link
@rhshadrach

rhshadrach Jul 23, 2023

Member

@lithomas1 - great, thanks. Just wanted to make sure. I already have it fixed in #54234. I'm planning to get to #53731 tonight or tomorrow.

"mean",
"median",
"var",
Expand All @@ -554,6 +617,22 @@ def time_frame_agg(self, dtype, method):
self.df.groupby("key").agg(method)


class GroupByNumbaAgg(GroupByCythonAgg):
"""
Benchmarks specifically targeting our numba aggregation algorithms
(using a big enough dataframe with simple key, so a large part of the
time is actually spent in the grouped aggregation).
"""

def setup(self, dtype, method):
if method in _numba_unsupported_methods:
raise NotImplementedError
super().setup(dtype, method)

def time_frame_agg(self, dtype, method):
self.df.groupby("key").agg(method, engine="numba")


class GroupByCythonAggEaDtypes:
"""
Benchmarks specifically targeting our cython aggregation algorithms
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v2.1.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ Other enhancements
- :meth:`SeriesGroupby.transform` and :meth:`DataFrameGroupby.transform` now support passing in a string as the function for ``engine="numba"`` (:issue:`53579`)
- Added ``engine_kwargs`` parameter to :meth:`DataFrame.to_excel` (:issue:`53220`)
- Added a new parameter ``by_row`` to :meth:`Series.apply`. When set to ``False`` the supplied callables will always operate on the whole Series (:issue:`53400`).
- Groupby aggregations (such as :meth:`DataFrameGroupby.sum`) now can preserve the dtype of the input instead of casting to ``float64`` (:issue:`44952`)
- Many read/to_* functions, such as :meth:`DataFrame.to_pickle` and :func:`read_csv`, support forwarding compression arguments to lzma.LZMAFile (:issue:`52979`)
- Performance improvement in :func:`concat` with homogeneous ``np.float64`` or ``np.float32`` dtypes (:issue:`52685`)
- Performance improvement in :meth:`DataFrame.filter` when ``items`` is given (:issue:`52941`)
Expand Down
127 changes: 111 additions & 16 deletions pandas/core/_numba/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
from typing import (
TYPE_CHECKING,
Any,
Callable,
)

Expand All @@ -15,8 +16,86 @@


@functools.cache
def make_looper(func, result_dtype, nopython, nogil, parallel):
if TYPE_CHECKING:
import numba
else:
numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def column_looper(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
*args,
):
result = np.empty((values.shape[0], len(start)), dtype=result_dtype)
na_positions = {}
for i in numba.prange(values.shape[0]):
output, na_pos = func(
values[i], result_dtype, start, end, min_periods, *args
)
result[i] = output
if len(na_pos) > 0:
na_positions[i] = np.array(na_pos)
return result, na_positions

return column_looper


default_dtype_mapping: dict[np.dtype, Any] = {
np.dtype("int8"): np.int64,
np.dtype("int16"): np.int64,
np.dtype("int32"): np.int64,
np.dtype("int64"): np.int64,
np.dtype("uint8"): np.uint64,
np.dtype("uint16"): np.uint64,
np.dtype("uint32"): np.uint64,
np.dtype("uint64"): np.uint64,
np.dtype("float32"): np.float64,
np.dtype("float64"): np.float64,
np.dtype("complex64"): np.complex128,
np.dtype("complex128"): np.complex128,
}


# TODO: Preserve complex dtypes

float_dtype_mapping: dict[np.dtype, Any] = {
np.dtype("int8"): np.float64,
np.dtype("int16"): np.float64,
np.dtype("int32"): np.float64,
np.dtype("int64"): np.float64,
np.dtype("uint8"): np.float64,
np.dtype("uint16"): np.float64,
np.dtype("uint32"): np.float64,
np.dtype("uint64"): np.float64,
np.dtype("float32"): np.float64,
np.dtype("float64"): np.float64,
np.dtype("complex64"): np.float64,
np.dtype("complex128"): np.float64,
}

identity_dtype_mapping: dict[np.dtype, Any] = {
np.dtype("int8"): np.int8,
np.dtype("int16"): np.int16,
np.dtype("int32"): np.int32,
np.dtype("int64"): np.int64,
np.dtype("uint8"): np.uint8,
np.dtype("uint16"): np.uint16,
np.dtype("uint32"): np.uint32,
np.dtype("uint64"): np.uint64,
np.dtype("float32"): np.float32,
np.dtype("float64"): np.float64,
np.dtype("complex64"): np.complex64,
np.dtype("complex128"): np.complex128,
}


def generate_shared_aggregator(
func: Callable[..., Scalar],
dtype_mapping: dict[np.dtype, np.dtype],
nopython: bool,
nogil: bool,
parallel: bool,
Expand All @@ -29,6 +108,9 @@ def generate_shared_aggregator(
----------
func : function
aggregation function to be applied to each column
dtype_mapping: dict or None
If not None, maps a dtype to a result dtype.
Otherwise, will fall back to default mapping.
nopython : bool
nopython to be passed into numba.jit
nogil : bool
Expand All @@ -40,22 +122,35 @@ def generate_shared_aggregator(
-------
Numba function
"""
if TYPE_CHECKING:
import numba
else:
numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def column_looper(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
*args,
):
result = np.empty((len(start), values.shape[1]), dtype=np.float64)
for i in numba.prange(values.shape[1]):
result[:, i] = func(values[:, i], start, end, min_periods, *args)
# A wrapper around the looper function,
# to dispatch based on dtype since numba is unable to do that in nopython mode

# It also post-processes the values by inserting nans where number of observations
# is less than min_periods
# Cannot do this in numba nopython mode
# (you'll run into type-unification error when you cast int -> float)
def looper_wrapper(values, start, end, min_periods, **kwargs):
result_dtype = dtype_mapping[values.dtype]
column_looper = make_looper(func, result_dtype, nopython, nogil, parallel)
# Need to unpack kwargs since numba only supports *args
result, na_positions = column_looper(
values, start, end, min_periods, *kwargs.values()
)
if result.dtype.kind == "i":
# Look if na_positions is not empty
# If so, convert the whole block
# This is OK since int dtype cannot hold nan,
# so if min_periods not satisfied for 1 col, it is not satisfied for
# all columns at that index
for na_pos in na_positions.values():
if len(na_pos) > 0:
result = result.astype("float64")
break
# TODO: Optimize this
for i, na_pos in na_positions.items():
if len(na_pos) > 0:
result[i, na_pos] = np.nan
return result

return column_looper
return looper_wrapper
11 changes: 8 additions & 3 deletions pandas/core/_numba/kernels/mean_.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ def remove_mean(
@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_mean(
values: np.ndarray,
result_dtype: np.dtype,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
) -> np.ndarray:
) -> tuple[np.ndarray, list[int]]:
N = len(start)
nobs = 0
sum_x = 0.0
Expand All @@ -75,7 +76,7 @@ def sliding_mean(
start
) and is_monotonic_increasing(end)

output = np.empty(N, dtype=np.float64)
output = np.empty(N, dtype=result_dtype)

for i in range(N):
s = start[i]
Expand Down Expand Up @@ -147,4 +148,8 @@ def sliding_mean(
neg_ct = 0
compensation_remove = 0.0

return output
# na_position is empty list since float64 can already hold nans
# Do list comprehension, since numba cannot figure out that na_pos is
# empty list of ints on its own
na_pos = [0 for i in range(0)]
return output, na_pos
13 changes: 9 additions & 4 deletions pandas/core/_numba/kernels/min_max_.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_min_max(
values: np.ndarray,
result_dtype: np.dtype,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
is_max: bool,
) -> np.ndarray:
) -> tuple[np.ndarray, list[int]]:
N = len(start)
nobs = 0
output = np.empty(N, dtype=np.float64)
output = np.empty(N, dtype=result_dtype)
na_pos = []
# Use deque once numba supports it
# https://github.com/numba/numba/issues/7417
Q: list = []
Expand Down Expand Up @@ -64,6 +66,9 @@ def sliding_min_max(
if Q and curr_win_size > 0 and nobs >= min_periods:
output[i] = values[Q[0]]
else:
output[i] = np.nan
if values.dtype.kind != "i":
output[i] = np.nan
else:
na_pos.append(i)

return output
return output, na_pos
Loading

0 comments on commit 870a504

Please sign in to comment.