From ac5587cf092315d5cd30612e25c10c4c0325b107 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Sun, 22 Oct 2023 15:52:18 -0400 Subject: [PATCH] ENH: Add numba engine to df.apply (#55104) * ENH: Add numba engine to df.apply * complete? * wip: pass tests * fix existing tests * go for green * fix checks? * fix pyright * update docs * eliminate a blank line * update from code review + more tests * fix failing tests * Simplify w/ context manager * skip if no numba * simplify more * specify dtypes * address code review * add errors for invalid columns * adjust message --- pandas/core/_numba/extensions.py | 575 +++++++++++++++++++++++++ pandas/core/apply.py | 184 +++++++- pandas/core/frame.py | 5 +- pandas/tests/apply/conftest.py | 12 + pandas/tests/apply/test_frame_apply.py | 139 +++--- pandas/tests/apply/test_numba.py | 95 ++++ pyproject.toml | 2 +- pyright_reportGeneralTypeIssues.json | 1 + scripts/validate_unwanted_patterns.py | 2 + 9 files changed, 948 insertions(+), 67 deletions(-) create mode 100644 pandas/core/_numba/extensions.py create mode 100644 pandas/tests/apply/test_numba.py diff --git a/pandas/core/_numba/extensions.py b/pandas/core/_numba/extensions.py new file mode 100644 index 0000000000000..ebe2a752a12f7 --- /dev/null +++ b/pandas/core/_numba/extensions.py @@ -0,0 +1,575 @@ +# Disable type checking for this module since numba's internals +# are not typed, and we use numba's internals via its extension API +# mypy: ignore-errors +""" +Utility classes/functions to let numba recognize +pandas Index/Series/DataFrame + +Mostly vendored from https://github.com/numba/numba/blob/main/numba/tests/pdlike_usecase.py +""" + +from __future__ import annotations + +from contextlib import contextmanager +import operator + +import numba +from numba import types +from numba.core import cgutils +from numba.core.datamodel import models +from numba.core.extending import ( + NativeValue, + box, + lower_builtin, + make_attribute_wrapper, + overload, + overload_attribute, + overload_method, + register_model, + type_callable, + typeof_impl, + unbox, +) +from numba.core.imputils import impl_ret_borrowed +import numpy as np + +from pandas._libs import lib + +from pandas.core.indexes.base import Index +from pandas.core.indexing import _iLocIndexer +from pandas.core.series import Series + + +# Helper function to hack around fact that Index casts numpy string dtype to object +# +# Idea is to set an attribute on a Index called _numba_data +# that is the original data, or the object data casted to numpy string dtype, +# with a context manager that is unset afterwards +@contextmanager +def set_numba_data(index: Index): + numba_data = index._data + if numba_data.dtype == object: + if not lib.is_string_array(numba_data): + raise ValueError( + "The numba engine only supports using string or numeric column names" + ) + numba_data = numba_data.astype("U") + try: + index._numba_data = numba_data + yield index + finally: + del index._numba_data + + +# TODO: Range index support +# (this currently lowers OK, but does not round-trip) +class IndexType(types.Type): + """ + The type class for Index objects. + """ + + def __init__(self, dtype, layout, pyclass: any) -> None: + self.pyclass = pyclass + name = f"index({dtype}, {layout})" + self.dtype = dtype + self.layout = layout + super().__init__(name) + + @property + def key(self): + return self.pyclass, self.dtype, self.layout + + @property + def as_array(self): + return types.Array(self.dtype, 1, self.layout) + + def copy(self, dtype=None, ndim: int = 1, layout=None): + assert ndim == 1 + if dtype is None: + dtype = self.dtype + layout = layout or self.layout + return type(self)(dtype, layout, self.pyclass) + + +class SeriesType(types.Type): + """ + The type class for Series objects. + """ + + def __init__(self, dtype, index, namety) -> None: + assert isinstance(index, IndexType) + self.dtype = dtype + self.index = index + self.values = types.Array(self.dtype, 1, "C") + self.namety = namety + name = f"series({dtype}, {index}, {namety})" + super().__init__(name) + + @property + def key(self): + return self.dtype, self.index, self.namety + + @property + def as_array(self): + return self.values + + def copy(self, dtype=None, ndim: int = 1, layout: str = "C"): + assert ndim == 1 + assert layout == "C" + if dtype is None: + dtype = self.dtype + return type(self)(dtype, self.index, self.namety) + + +@typeof_impl.register(Index) +def typeof_index(val, c): + """ + This will assume that only strings are in object dtype + index. + (you should check this before this gets lowered down to numba) + """ + # arrty = typeof_impl(val._data, c) + arrty = typeof_impl(val._numba_data, c) + assert arrty.ndim == 1 + return IndexType(arrty.dtype, arrty.layout, type(val)) + + +@typeof_impl.register(Series) +def typeof_series(val, c): + index = typeof_impl(val.index, c) + arrty = typeof_impl(val.values, c) + namety = typeof_impl(val.name, c) + assert arrty.ndim == 1 + assert arrty.layout == "C" + return SeriesType(arrty.dtype, index, namety) + + +@type_callable(Series) +def type_series_constructor(context): + def typer(data, index, name=None): + if isinstance(index, IndexType) and isinstance(data, types.Array): + assert data.ndim == 1 + if name is None: + name = types.intp + return SeriesType(data.dtype, index, name) + + return typer + + +@type_callable(Index) +def type_index_constructor(context): + def typer(data, hashmap=None): + if isinstance(data, types.Array): + assert data.layout == "C" + assert data.ndim == 1 + assert hashmap is None or isinstance(hashmap, types.DictType) + return IndexType(data.dtype, layout=data.layout, pyclass=Index) + + return typer + + +# Backend extensions for Index and Series and Frame +@register_model(IndexType) +class IndexModel(models.StructModel): + def __init__(self, dmm, fe_type) -> None: + # We don't want the numpy string scalar type in our hashmap + members = [ + ("data", fe_type.as_array), + # This is an attempt to emulate our hashtable code with a numba + # typed dict + # It maps from values in the index to their integer positions in the array + ("hashmap", types.DictType(fe_type.dtype, types.intp)), + # Pointer to the Index object this was created from, or that it + # boxes to + # https://numba.discourse.group/t/qst-how-to-cache-the-boxing-of-an-object/2128/2?u=lithomas1 + ("parent", types.pyobject), + ] + models.StructModel.__init__(self, dmm, fe_type, members) + + +@register_model(SeriesType) +class SeriesModel(models.StructModel): + def __init__(self, dmm, fe_type) -> None: + members = [ + ("index", fe_type.index), + ("values", fe_type.as_array), + ("name", fe_type.namety), + ] + models.StructModel.__init__(self, dmm, fe_type, members) + + +make_attribute_wrapper(IndexType, "data", "_data") +make_attribute_wrapper(IndexType, "hashmap", "hashmap") + +make_attribute_wrapper(SeriesType, "index", "index") +make_attribute_wrapper(SeriesType, "values", "values") +make_attribute_wrapper(SeriesType, "name", "name") + + +@lower_builtin(Series, types.Array, IndexType) +def pdseries_constructor(context, builder, sig, args): + data, index = args + series = cgutils.create_struct_proxy(sig.return_type)(context, builder) + series.index = index + series.values = data + series.name = context.get_constant(types.intp, 0) + return impl_ret_borrowed(context, builder, sig.return_type, series._getvalue()) + + +@lower_builtin(Series, types.Array, IndexType, types.intp) +@lower_builtin(Series, types.Array, IndexType, types.float64) +@lower_builtin(Series, types.Array, IndexType, types.unicode_type) +def pdseries_constructor_with_name(context, builder, sig, args): + data, index, name = args + series = cgutils.create_struct_proxy(sig.return_type)(context, builder) + series.index = index + series.values = data + series.name = name + return impl_ret_borrowed(context, builder, sig.return_type, series._getvalue()) + + +@lower_builtin(Index, types.Array, types.DictType, types.pyobject) +def index_constructor_2arg(context, builder, sig, args): + (data, hashmap, parent) = args + index = cgutils.create_struct_proxy(sig.return_type)(context, builder) + + index.data = data + index.hashmap = hashmap + index.parent = parent + return impl_ret_borrowed(context, builder, sig.return_type, index._getvalue()) + + +@lower_builtin(Index, types.Array, types.DictType) +def index_constructor_2arg_parent(context, builder, sig, args): + # Basically same as index_constructor_1arg, but also lets you specify the + # parent object + (data, hashmap) = args + index = cgutils.create_struct_proxy(sig.return_type)(context, builder) + + index.data = data + index.hashmap = hashmap + return impl_ret_borrowed(context, builder, sig.return_type, index._getvalue()) + + +@lower_builtin(Index, types.Array) +def index_constructor_1arg(context, builder, sig, args): + from numba.typed import Dict + + key_type = sig.return_type.dtype + value_type = types.intp + + def index_impl(data): + return Index(data, Dict.empty(key_type, value_type)) + + return context.compile_internal(builder, index_impl, sig, args) + + +# Helper to convert the unicodecharseq (numpy string scalar) into a unicode_type +# (regular string) +def maybe_cast_str(x): + # Dummy function that numba can overload + pass + + +@overload(maybe_cast_str) +def maybe_cast_str_impl(x): + """Converts numba UnicodeCharSeq (numpy string scalar) -> unicode type (string). + Is a no-op for other types.""" + if isinstance(x, types.UnicodeCharSeq): + return lambda x: str(x) + else: + return lambda x: x + + +@unbox(IndexType) +def unbox_index(typ, obj, c): + """ + Convert a Index object to a native structure. + + Note: Object dtype is not allowed here + """ + data_obj = c.pyapi.object_getattr_string(obj, "_numba_data") + index = cgutils.create_struct_proxy(typ)(c.context, c.builder) + # If we see an object array, assume its been validated as only containing strings + # We still need to do the conversion though + index.data = c.unbox(typ.as_array, data_obj).value + typed_dict_obj = c.pyapi.unserialize(c.pyapi.serialize_object(numba.typed.Dict)) + # Create an empty typed dict in numba for the hashmap for indexing + # equiv of numba.typed.Dict.empty(typ.dtype, types.intp) + arr_type_obj = c.pyapi.unserialize(c.pyapi.serialize_object(typ.dtype)) + intp_type_obj = c.pyapi.unserialize(c.pyapi.serialize_object(types.intp)) + hashmap_obj = c.pyapi.call_method( + typed_dict_obj, "empty", (arr_type_obj, intp_type_obj) + ) + index.hashmap = c.unbox(types.DictType(typ.dtype, types.intp), hashmap_obj).value + # Set the parent for speedy boxing. + index.parent = obj + + # Decrefs + c.pyapi.decref(data_obj) + c.pyapi.decref(arr_type_obj) + c.pyapi.decref(intp_type_obj) + c.pyapi.decref(typed_dict_obj) + + return NativeValue(index._getvalue()) + + +@unbox(SeriesType) +def unbox_series(typ, obj, c): + """ + Convert a Series object to a native structure. + """ + index_obj = c.pyapi.object_getattr_string(obj, "index") + values_obj = c.pyapi.object_getattr_string(obj, "values") + name_obj = c.pyapi.object_getattr_string(obj, "name") + + series = cgutils.create_struct_proxy(typ)(c.context, c.builder) + series.index = c.unbox(typ.index, index_obj).value + series.values = c.unbox(typ.values, values_obj).value + series.name = c.unbox(typ.namety, name_obj).value + + # Decrefs + c.pyapi.decref(index_obj) + c.pyapi.decref(values_obj) + c.pyapi.decref(name_obj) + + return NativeValue(series._getvalue()) + + +@box(IndexType) +def box_index(typ, val, c): + """ + Convert a native index structure to a Index object. + + If our native index is of a numpy string dtype, we'll cast it to + object. + """ + # First build a Numpy array object, then wrap it in a Index + index = cgutils.create_struct_proxy(typ)(c.context, c.builder, value=val) + + res = cgutils.alloca_once_value(c.builder, index.parent) + + # Does parent exist? + # (it means already boxed once, or Index same as original df.index or df.columns) + # xref https://github.com/numba/numba/blob/596e8a55334cc46854e3192766e643767bd7c934/numba/core/boxing.py#L593C17-L593C17 + with c.builder.if_else(cgutils.is_not_null(c.builder, index.parent)) as ( + has_parent, + otherwise, + ): + with has_parent: + c.pyapi.incref(index.parent) + with otherwise: + # TODO: preserve the original class for the index + # Also need preserve the name of the Index + # class_obj = c.pyapi.unserialize(c.pyapi.serialize_object(typ.pyclass)) + class_obj = c.pyapi.unserialize(c.pyapi.serialize_object(Index)) + array_obj = c.box(typ.as_array, index.data) + if isinstance(typ.dtype, types.UnicodeCharSeq): + # We converted to numpy string dtype, convert back + # to object since _simple_new won't do that for uss + object_str_obj = c.pyapi.unserialize(c.pyapi.serialize_object("object")) + array_obj = c.pyapi.call_method(array_obj, "astype", (object_str_obj,)) + c.pyapi.decref(object_str_obj) + # this is basically Index._simple_new(array_obj, name_obj) in python + index_obj = c.pyapi.call_method(class_obj, "_simple_new", (array_obj,)) + index.parent = index_obj + c.builder.store(index_obj, res) + + # Decrefs + c.pyapi.decref(class_obj) + c.pyapi.decref(array_obj) + return c.builder.load(res) + + +@box(SeriesType) +def box_series(typ, val, c): + """ + Convert a native series structure to a Series object. + """ + series = cgutils.create_struct_proxy(typ)(c.context, c.builder, value=val) + class_obj = c.pyapi.unserialize(c.pyapi.serialize_object(Series)) + index_obj = c.box(typ.index, series.index) + array_obj = c.box(typ.as_array, series.values) + name_obj = c.box(typ.namety, series.name) + true_obj = c.pyapi.unserialize(c.pyapi.serialize_object(True)) + # This is equivalent of + # pd.Series(data=array_obj, index=index_obj, dtype=None, + # name=name_obj, copy=None, fastpath=True) + series_obj = c.pyapi.call_function_objargs( + class_obj, + ( + array_obj, + index_obj, + c.pyapi.borrow_none(), + name_obj, + c.pyapi.borrow_none(), + true_obj, + ), + ) + + # Decrefs + c.pyapi.decref(class_obj) + c.pyapi.decref(index_obj) + c.pyapi.decref(array_obj) + c.pyapi.decref(name_obj) + c.pyapi.decref(true_obj) + + return series_obj + + +# Add common series reductions (e.g. mean, sum), +# and also add common binops (e.g. add, sub, mul, div) +def generate_series_reduction(ser_reduction, ser_method): + @overload_method(SeriesType, ser_reduction) + def series_reduction(series): + def series_reduction_impl(series): + return ser_method(series.values) + + return series_reduction_impl + + return series_reduction + + +def generate_series_binop(binop): + @overload(binop) + def series_binop(series1, value): + if isinstance(series1, SeriesType): + if isinstance(value, SeriesType): + + def series_binop_impl(series1, series2): + # TODO: Check index matching? + return Series( + binop(series1.values, series2.values), + series1.index, + series1.name, + ) + + return series_binop_impl + else: + + def series_binop_impl(series1, value): + return Series( + binop(series1.values, value), series1.index, series1.name + ) + + return series_binop_impl + + return series_binop + + +series_reductions = [ + ("sum", np.sum), + ("mean", np.mean), + # Disabled due to discrepancies between numba std. dev + # and pandas std. dev (no way to specify dof) + # ("std", np.std), + # ("var", np.var), + ("min", np.min), + ("max", np.max), +] +for reduction, reduction_method in series_reductions: + generate_series_reduction(reduction, reduction_method) + +series_binops = [operator.add, operator.sub, operator.mul, operator.truediv] + +for ser_binop in series_binops: + generate_series_binop(ser_binop) + + +# get_loc on Index +@overload_method(IndexType, "get_loc") +def index_get_loc(index, item): + def index_get_loc_impl(index, item): + # Initialize the hash table if not initialized + if len(index.hashmap) == 0: + for i, val in enumerate(index._data): + index.hashmap[val] = i + return index.hashmap[item] + + return index_get_loc_impl + + +# Indexing for Series/Index +@overload(operator.getitem) +def series_indexing(series, item): + if isinstance(series, SeriesType): + + def series_getitem(series, item): + loc = series.index.get_loc(item) + return series.iloc[loc] + + return series_getitem + + +@overload(operator.getitem) +def index_indexing(index, idx): + if isinstance(index, IndexType): + + def index_getitem(index, idx): + return index._data[idx] + + return index_getitem + + +class IlocType(types.Type): + def __init__(self, obj_type) -> None: + self.obj_type = obj_type + name = f"iLocIndexer({obj_type})" + super().__init__(name=name) + + @property + def key(self): + return self.obj_type + + +@typeof_impl.register(_iLocIndexer) +def typeof_iloc(val, c): + objtype = typeof_impl(val.obj, c) + return IlocType(objtype) + + +@type_callable(_iLocIndexer) +def type_iloc_constructor(context): + def typer(obj): + if isinstance(obj, SeriesType): + return IlocType(obj) + + return typer + + +@lower_builtin(_iLocIndexer, SeriesType) +def iloc_constructor(context, builder, sig, args): + (obj,) = args + iloc_indexer = cgutils.create_struct_proxy(sig.return_type)(context, builder) + iloc_indexer.obj = obj + return impl_ret_borrowed( + context, builder, sig.return_type, iloc_indexer._getvalue() + ) + + +@register_model(IlocType) +class ILocModel(models.StructModel): + def __init__(self, dmm, fe_type) -> None: + members = [("obj", fe_type.obj_type)] + models.StructModel.__init__(self, dmm, fe_type, members) + + +make_attribute_wrapper(IlocType, "obj", "obj") + + +@overload_attribute(SeriesType, "iloc") +def series_iloc(series): + def get(series): + return _iLocIndexer(series) + + return get + + +@overload(operator.getitem) +def iloc_getitem(iloc_indexer, i): + if isinstance(iloc_indexer, IlocType): + + def getitem_impl(iloc_indexer, i): + return iloc_indexer.obj.values[i] + + return getitem_impl diff --git a/pandas/core/apply.py b/pandas/core/apply.py index 1525e316f345f..3b79882d3c762 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -2,6 +2,7 @@ import abc from collections import defaultdict +import functools from functools import partial import inspect from typing import ( @@ -29,6 +30,7 @@ NDFrameT, npt, ) +from pandas.compat._optional import import_optional_dependency from pandas.errors import SpecificationError from pandas.util._decorators import cache_readonly from pandas.util._exceptions import find_stack_level @@ -36,7 +38,9 @@ from pandas.core.dtypes.cast import is_nested_object from pandas.core.dtypes.common import ( is_dict_like, + is_extension_array_dtype, is_list_like, + is_numeric_dtype, is_sequence, ) from pandas.core.dtypes.dtypes import ( @@ -121,6 +125,8 @@ def __init__( result_type: str | None, *, by_row: Literal[False, "compat", "_compat"] = "compat", + engine: str = "python", + engine_kwargs: dict[str, bool] | None = None, args, kwargs, ) -> None: @@ -133,6 +139,9 @@ def __init__( self.args = args or () self.kwargs = kwargs or {} + self.engine = engine + self.engine_kwargs = {} if engine_kwargs is None else engine_kwargs + if result_type not in [None, "reduce", "broadcast", "expand"]: raise ValueError( "invalid value for result_type, must be one " @@ -601,6 +610,13 @@ def apply_list_or_dict_like(self) -> DataFrame | Series: result: Series, DataFrame, or None Result when self.func is a list-like or dict-like, None otherwise. """ + + if self.engine == "numba": + raise NotImplementedError( + "The 'numba' engine doesn't support list-like/" + "dict likes of callables yet." + ) + if self.axis == 1 and isinstance(self.obj, ABCDataFrame): return self.obj.T.apply(self.func, 0, args=self.args, **self.kwargs).T @@ -768,10 +784,16 @@ def __init__( ) -> None: if by_row is not False and by_row != "compat": raise ValueError(f"by_row={by_row} not allowed") - self.engine = engine - self.engine_kwargs = engine_kwargs super().__init__( - obj, func, raw, result_type, by_row=by_row, args=args, kwargs=kwargs + obj, + func, + raw, + result_type, + by_row=by_row, + engine=engine, + engine_kwargs=engine_kwargs, + args=args, + kwargs=kwargs, ) # --------------------------------------------------------------- @@ -792,6 +814,32 @@ def result_columns(self) -> Index: def series_generator(self) -> Generator[Series, None, None]: pass + @staticmethod + @functools.cache + @abc.abstractmethod + def generate_numba_apply_func( + func, nogil=True, nopython=True, parallel=False + ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]: + pass + + @abc.abstractmethod + def apply_with_numba(self): + pass + + def validate_values_for_numba(self): + # Validate column dtyps all OK + for colname, dtype in self.obj.dtypes.items(): + if not is_numeric_dtype(dtype): + raise ValueError( + f"Column {colname} must have a numeric dtype. " + f"Found '{dtype}' instead" + ) + if is_extension_array_dtype(dtype): + raise ValueError( + f"Column {colname} is backed by an extension array, " + f"which is not supported by the numba engine." + ) + @abc.abstractmethod def wrap_results_for_axis( self, results: ResType, res_index: Index @@ -815,13 +863,12 @@ def values(self): def apply(self) -> DataFrame | Series: """compute the results""" - if self.engine == "numba" and not self.raw: - raise ValueError( - "The numba engine in DataFrame.apply can only be used when raw=True" - ) - # dispatch to handle list-like or dict-like if is_list_like(self.func): + if self.engine == "numba": + raise NotImplementedError( + "the 'numba' engine doesn't support lists of callables yet" + ) return self.apply_list_or_dict_like() # all empty @@ -830,10 +877,20 @@ def apply(self) -> DataFrame | Series: # string dispatch if isinstance(self.func, str): + if self.engine == "numba": + raise NotImplementedError( + "the 'numba' engine doesn't support using " + "a string as the callable function" + ) return self.apply_str() # ufunc elif isinstance(self.func, np.ufunc): + if self.engine == "numba": + raise NotImplementedError( + "the 'numba' engine doesn't support " + "using a numpy ufunc as the callable function" + ) with np.errstate(all="ignore"): results = self.obj._mgr.apply("apply", func=self.func) # _constructor will retain self.index and self.columns @@ -841,6 +898,10 @@ def apply(self) -> DataFrame | Series: # broadcasting if self.result_type == "broadcast": + if self.engine == "numba": + raise NotImplementedError( + "the 'numba' engine doesn't support result_type='broadcast'" + ) return self.apply_broadcast(self.obj) # one axis empty @@ -997,7 +1058,10 @@ def apply_broadcast(self, target: DataFrame) -> DataFrame: return result def apply_standard(self): - results, res_index = self.apply_series_generator() + if self.engine == "python": + results, res_index = self.apply_series_generator() + else: + results, res_index = self.apply_series_numba() # wrap results return self.wrap_results(results, res_index) @@ -1021,6 +1085,19 @@ def apply_series_generator(self) -> tuple[ResType, Index]: return results, res_index + def apply_series_numba(self): + if self.engine_kwargs.get("parallel", False): + raise NotImplementedError( + "Parallel apply is not supported when raw=False and engine='numba'" + ) + if not self.obj.index.is_unique or not self.columns.is_unique: + raise NotImplementedError( + "The index/columns must be unique when raw=False and engine='numba'" + ) + self.validate_values_for_numba() + results = self.apply_with_numba() + return results, self.result_index + def wrap_results(self, results: ResType, res_index: Index) -> DataFrame | Series: from pandas import Series @@ -1060,6 +1137,49 @@ class FrameRowApply(FrameApply): def series_generator(self) -> Generator[Series, None, None]: return (self.obj._ixs(i, axis=1) for i in range(len(self.columns))) + @staticmethod + @functools.cache + def generate_numba_apply_func( + func, nogil=True, nopython=True, parallel=False + ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]: + numba = import_optional_dependency("numba") + from pandas import Series + + # Import helper from extensions to cast string object -> np strings + # Note: This also has the side effect of loading our numba extensions + from pandas.core._numba.extensions import maybe_cast_str + + jitted_udf = numba.extending.register_jitable(func) + + # Currently the parallel argument doesn't get passed through here + # (it's disabled) since the dicts in numba aren't thread-safe. + @numba.jit(nogil=nogil, nopython=nopython, parallel=parallel) + def numba_func(values, col_names, df_index): + results = {} + for j in range(values.shape[1]): + # Create the series + ser = Series( + values[:, j], index=df_index, name=maybe_cast_str(col_names[j]) + ) + results[j] = jitted_udf(ser) + return results + + return numba_func + + def apply_with_numba(self) -> dict[int, Any]: + nb_func = self.generate_numba_apply_func( + cast(Callable, self.func), **self.engine_kwargs + ) + from pandas.core._numba.extensions import set_numba_data + + # Convert from numba dict to regular dict + # Our isinstance checks in the df constructor don't pass for numbas typed dict + with set_numba_data(self.obj.index) as index, set_numba_data( + self.columns + ) as columns: + res = dict(nb_func(self.values, columns, index)) + return res + @property def result_index(self) -> Index: return self.columns @@ -1143,6 +1263,52 @@ def series_generator(self) -> Generator[Series, None, None]: object.__setattr__(ser, "_name", name) yield ser + @staticmethod + @functools.cache + def generate_numba_apply_func( + func, nogil=True, nopython=True, parallel=False + ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]: + numba = import_optional_dependency("numba") + from pandas import Series + from pandas.core._numba.extensions import maybe_cast_str + + jitted_udf = numba.extending.register_jitable(func) + + @numba.jit(nogil=nogil, nopython=nopython, parallel=parallel) + def numba_func(values, col_names_index, index): + results = {} + # Currently the parallel argument doesn't get passed through here + # (it's disabled) since the dicts in numba aren't thread-safe. + for i in range(values.shape[0]): + # Create the series + # TODO: values corrupted without the copy + ser = Series( + values[i].copy(), + index=col_names_index, + name=maybe_cast_str(index[i]), + ) + results[i] = jitted_udf(ser) + + return results + + return numba_func + + def apply_with_numba(self) -> dict[int, Any]: + nb_func = self.generate_numba_apply_func( + cast(Callable, self.func), **self.engine_kwargs + ) + + from pandas.core._numba.extensions import set_numba_data + + # Convert from numba dict to regular dict + # Our isinstance checks in the df constructor don't pass for numbas typed dict + with set_numba_data(self.obj.index) as index, set_numba_data( + self.columns + ) as columns: + res = dict(nb_func(self.values, columns, index)) + + return res + @property def result_index(self) -> Index: return self.index diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 70a5ac69011d1..b2d359078a4ce 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10090,6 +10090,9 @@ def apply( - nogil (release the GIL inside the JIT compiled function) - parallel (try to apply the function in parallel over the DataFrame) + Note: Due to limitations within numba/how pandas interfaces with numba, + you should only use this if raw=True + Note: The numba compiler only supports a subset of valid Python/numpy operations. @@ -10099,8 +10102,6 @@ def apply( `_ in numba to learn what you can or cannot use in the passed function. - As of right now, the numba engine can only be used with raw=True. - .. versionadded:: 2.2.0 engine_kwargs : dict diff --git a/pandas/tests/apply/conftest.py b/pandas/tests/apply/conftest.py index b68c6235cb0b8..7ed9fc88c3aea 100644 --- a/pandas/tests/apply/conftest.py +++ b/pandas/tests/apply/conftest.py @@ -16,3 +16,15 @@ def int_frame_const_col(): columns=["A", "B", "C"], ) return df + + +@pytest.fixture(params=["python", "numba"]) +def engine(request): + if request.param == "numba": + pytest.importorskip("numba") + return request.param + + +@pytest.fixture(params=[0, 1]) +def apply_axis(request): + return request.param diff --git a/pandas/tests/apply/test_frame_apply.py b/pandas/tests/apply/test_frame_apply.py index 232cfceb3b6d6..5516ecb9e2798 100644 --- a/pandas/tests/apply/test_frame_apply.py +++ b/pandas/tests/apply/test_frame_apply.py @@ -18,27 +18,23 @@ from pandas.tests.frame.common import zip_frames -@pytest.fixture(params=["python", "numba"]) -def engine(request): - if request.param == "numba": - pytest.importorskip("numba") - return request.param - - -def test_apply(float_frame): +def test_apply(float_frame, engine, request): + if engine == "numba": + mark = pytest.mark.xfail(reason="numba engine not supporting numpy ufunc yet") + request.node.add_marker(mark) with np.errstate(all="ignore"): # ufunc result = np.sqrt(float_frame["A"]) - expected = float_frame.apply(np.sqrt)["A"] + expected = float_frame.apply(np.sqrt, engine=engine)["A"] tm.assert_series_equal(result, expected) # aggregator - result = float_frame.apply(np.mean)["A"] + result = float_frame.apply(np.mean, engine=engine)["A"] expected = np.mean(float_frame["A"]) assert result == expected d = float_frame.index[0] - result = float_frame.apply(np.mean, axis=1) + result = float_frame.apply(np.mean, axis=1, engine=engine) expected = np.mean(float_frame.xs(d)) assert result[d] == expected assert result.index is float_frame.index @@ -46,8 +42,13 @@ def test_apply(float_frame): @pytest.mark.parametrize("axis", [0, 1]) @pytest.mark.parametrize("raw", [True, False]) -def test_apply_args(float_frame, axis, raw): - result = float_frame.apply(lambda x, y: x + y, axis, args=(1,), raw=raw) +def test_apply_args(float_frame, axis, raw, engine, request): + if engine == "numba": + mark = pytest.mark.xfail(reason="numba engine doesn't support args") + request.node.add_marker(mark) + result = float_frame.apply( + lambda x, y: x + y, axis, args=(1,), raw=raw, engine=engine + ) expected = float_frame + 1 tm.assert_frame_equal(result, expected) @@ -94,30 +95,30 @@ def test_apply_mixed_datetimelike(): @pytest.mark.parametrize("func", [np.sqrt, np.mean]) -def test_apply_empty(func): +def test_apply_empty(func, engine): # empty empty_frame = DataFrame() - result = empty_frame.apply(func) + result = empty_frame.apply(func, engine=engine) assert result.empty -def test_apply_float_frame(float_frame): +def test_apply_float_frame(float_frame, engine): no_rows = float_frame[:0] - result = no_rows.apply(lambda x: x.mean()) + result = no_rows.apply(lambda x: x.mean(), engine=engine) expected = Series(np.nan, index=float_frame.columns) tm.assert_series_equal(result, expected) no_cols = float_frame.loc[:, []] - result = no_cols.apply(lambda x: x.mean(), axis=1) + result = no_cols.apply(lambda x: x.mean(), axis=1, engine=engine) expected = Series(np.nan, index=float_frame.index) tm.assert_series_equal(result, expected) -def test_apply_empty_except_index(): +def test_apply_empty_except_index(engine): # GH 2476 expected = DataFrame(index=["a"]) - result = expected.apply(lambda x: x["a"], axis=1) + result = expected.apply(lambda x: x["a"], axis=1, engine=engine) tm.assert_frame_equal(result, expected) @@ -321,12 +322,6 @@ def test_apply_empty_infer_type(ax, func, raw, axis, engine, request): test_res = func(np.array([], dtype="f8")) is_reduction = not isinstance(test_res, np.ndarray) - if engine == "numba" and raw is False: - mark = pytest.mark.xfail( - reason="numba engine only supports raw=True at the moment" - ) - request.applymarker(mark) - result = df.apply(func, axis=axis, engine=engine, raw=raw) if is_reduction: agg_axis = df._get_agg_axis(axis) @@ -981,45 +976,69 @@ def test_result_type_shorter_list(int_frame_const_col): tm.assert_frame_equal(result, expected) -def test_result_type_broadcast(int_frame_const_col): +def test_result_type_broadcast(int_frame_const_col, request, engine): # result_type should be consistent no matter which # path we take in the code + if engine == "numba": + mark = pytest.mark.xfail(reason="numba engine doesn't support list return") + request.node.add_marker(mark) df = int_frame_const_col # broadcast result - result = df.apply(lambda x: [1, 2, 3], axis=1, result_type="broadcast") + result = df.apply( + lambda x: [1, 2, 3], axis=1, result_type="broadcast", engine=engine + ) expected = df.copy() tm.assert_frame_equal(result, expected) -def test_result_type_broadcast_series_func(int_frame_const_col): +def test_result_type_broadcast_series_func(int_frame_const_col, engine, request): # result_type should be consistent no matter which # path we take in the code + if engine == "numba": + mark = pytest.mark.xfail( + reason="numba Series constructor only support ndarrays not list data" + ) + request.node.add_marker(mark) df = int_frame_const_col columns = ["other", "col", "names"] result = df.apply( - lambda x: Series([1, 2, 3], index=columns), axis=1, result_type="broadcast" + lambda x: Series([1, 2, 3], index=columns), + axis=1, + result_type="broadcast", + engine=engine, ) expected = df.copy() tm.assert_frame_equal(result, expected) -def test_result_type_series_result(int_frame_const_col): +def test_result_type_series_result(int_frame_const_col, engine, request): # result_type should be consistent no matter which # path we take in the code + if engine == "numba": + mark = pytest.mark.xfail( + reason="numba Series constructor only support ndarrays not list data" + ) + request.node.add_marker(mark) df = int_frame_const_col # series result - result = df.apply(lambda x: Series([1, 2, 3], index=x.index), axis=1) + result = df.apply(lambda x: Series([1, 2, 3], index=x.index), axis=1, engine=engine) expected = df.copy() tm.assert_frame_equal(result, expected) -def test_result_type_series_result_other_index(int_frame_const_col): +def test_result_type_series_result_other_index(int_frame_const_col, engine, request): # result_type should be consistent no matter which # path we take in the code + + if engine == "numba": + mark = pytest.mark.xfail( + reason="no support in numba Series constructor for list of columns" + ) + request.node.add_marker(mark) df = int_frame_const_col # series result with other index columns = ["other", "col", "names"] - result = df.apply(lambda x: Series([1, 2, 3], index=columns), axis=1) + result = df.apply(lambda x: Series([1, 2, 3], index=columns), axis=1, engine=engine) expected = df.copy() expected.columns = columns tm.assert_frame_equal(result, expected) @@ -1379,25 +1398,34 @@ def f(x, a, b, c=3): @pytest.mark.parametrize("num_cols", [2, 3, 5]) -def test_frequency_is_original(num_cols): +def test_frequency_is_original(num_cols, engine, request): # GH 22150 + if engine == "numba": + mark = pytest.mark.xfail(reason="numba engine only supports numeric indices") + request.node.add_marker(mark) index = pd.DatetimeIndex(["1950-06-30", "1952-10-24", "1953-05-29"]) original = index.copy() df = DataFrame(1, index=index, columns=range(num_cols)) - df.apply(lambda x: x) + df.apply(lambda x: x, engine=engine) assert index.freq == original.freq -def test_apply_datetime_tz_issue(): +def test_apply_datetime_tz_issue(engine, request): # GH 29052 + if engine == "numba": + mark = pytest.mark.xfail( + reason="numba engine doesn't support non-numeric indexes" + ) + request.node.add_marker(mark) + timestamps = [ Timestamp("2019-03-15 12:34:31.909000+0000", tz="UTC"), Timestamp("2019-03-15 12:34:34.359000+0000", tz="UTC"), Timestamp("2019-03-15 12:34:34.660000+0000", tz="UTC"), ] df = DataFrame(data=[0, 1, 2], index=timestamps) - result = df.apply(lambda x: x.name, axis=1) + result = df.apply(lambda x: x.name, axis=1, engine=engine) expected = Series(index=timestamps, data=timestamps) tm.assert_series_equal(result, expected) @@ -1460,10 +1488,15 @@ def test_apply_empty_list_reduce(): tm.assert_series_equal(result, expected) -def test_apply_no_suffix_index(): +def test_apply_no_suffix_index(engine, request): # GH36189 + if engine == "numba": + mark = pytest.mark.xfail( + reason="numba engine doesn't support list-likes/dict-like callables" + ) + request.node.add_marker(mark) pdf = DataFrame([[4, 9]] * 3, columns=["A", "B"]) - result = pdf.apply(["sum", lambda x: x.sum(), lambda x: x.sum()]) + result = pdf.apply(["sum", lambda x: x.sum(), lambda x: x.sum()], engine=engine) expected = DataFrame( {"A": [12, 12, 12], "B": [27, 27, 27]}, index=["sum", "", ""] ) @@ -1512,10 +1545,17 @@ def sum_div2(s): tm.assert_frame_equal(result, expected) -def test_apply_getitem_axis_1(): +def test_apply_getitem_axis_1(engine, request): # GH 13427 + if engine == "numba": + mark = pytest.mark.xfail( + reason="numba engine not supporting duplicate index values" + ) + request.node.add_marker(mark) df = DataFrame({"a": [0, 1, 2], "b": [1, 2, 3]}) - result = df[["a", "a"]].apply(lambda x: x.iloc[0] + x.iloc[1], axis=1) + result = df[["a", "a"]].apply( + lambda x: x.iloc[0] + x.iloc[1], axis=1, engine=engine + ) expected = Series([0, 2, 4]) tm.assert_series_equal(result, expected) @@ -1555,10 +1595,10 @@ def test_apply_type(): tm.assert_series_equal(result, expected) -def test_apply_on_empty_dataframe(): +def test_apply_on_empty_dataframe(engine): # GH 39111 df = DataFrame({"a": [1, 2], "b": [3, 0]}) - result = df.head(0).apply(lambda x: max(x["a"], x["b"]), axis=1) + result = df.head(0).apply(lambda x: max(x["a"], x["b"]), axis=1, engine=engine) expected = Series([], dtype=np.float64) tm.assert_series_equal(result, expected) @@ -1656,14 +1696,3 @@ def test_agg_dist_like_and_nonunique_columns(): result = df.agg({"A": "count"}) expected = df["A"].count() tm.assert_series_equal(result, expected) - - -def test_numba_unsupported(): - df = DataFrame( - {"A": [None, 2, 3], "B": [1.0, np.nan, 3.0], "C": ["foo", None, "bar"]} - ) - with pytest.raises( - ValueError, - match="The numba engine in DataFrame.apply can only be used when raw=True", - ): - df.apply(lambda x: x, engine="numba", raw=False) diff --git a/pandas/tests/apply/test_numba.py b/pandas/tests/apply/test_numba.py new file mode 100644 index 0000000000000..7e1e44d2119f9 --- /dev/null +++ b/pandas/tests/apply/test_numba.py @@ -0,0 +1,95 @@ +import numpy as np +import pytest + +import pandas.util._test_decorators as td + +from pandas import ( + DataFrame, + Index, +) +import pandas._testing as tm + +pytestmark = td.skip_if_no("numba") + + +def test_numba_vs_python_noop(float_frame, apply_axis): + func = lambda x: x + result = float_frame.apply(func, engine="numba", axis=apply_axis) + expected = float_frame.apply(func, engine="python", axis=apply_axis) + tm.assert_frame_equal(result, expected) + + +def test_numba_vs_python_indexing(): + frame = DataFrame( + {"a": [1, 2, 3], "b": [4, 5, 6], "c": [7.0, 8.0, 9.0]}, + index=Index(["A", "B", "C"]), + ) + row_func = lambda x: x["c"] + result = frame.apply(row_func, engine="numba", axis=1) + expected = frame.apply(row_func, engine="python", axis=1) + tm.assert_series_equal(result, expected) + + col_func = lambda x: x["A"] + result = frame.apply(col_func, engine="numba", axis=0) + expected = frame.apply(col_func, engine="python", axis=0) + tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize( + "reduction", + [lambda x: x.mean(), lambda x: x.min(), lambda x: x.max(), lambda x: x.sum()], +) +def test_numba_vs_python_reductions(float_frame, reduction, apply_axis): + result = float_frame.apply(reduction, engine="numba", axis=apply_axis) + expected = float_frame.apply(reduction, engine="python", axis=apply_axis) + tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize("colnames", [[1, 2, 3], [1.0, 2.0, 3.0]]) +def test_numba_numeric_colnames(colnames): + # Check that numeric column names lower properly and can be indxed on + df = DataFrame( + np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=np.int64), columns=colnames + ) + first_col = colnames[0] + f = lambda x: x[first_col] # Get the first column + result = df.apply(f, engine="numba", axis=1) + expected = df.apply(f, engine="python", axis=1) + tm.assert_series_equal(result, expected) + + +def test_numba_parallel_unsupported(float_frame): + f = lambda x: x + with pytest.raises( + NotImplementedError, + match="Parallel apply is not supported when raw=False and engine='numba'", + ): + float_frame.apply(f, engine="numba", engine_kwargs={"parallel": True}) + + +def test_numba_nonunique_unsupported(apply_axis): + f = lambda x: x + df = DataFrame({"a": [1, 2]}, index=Index(["a", "a"])) + with pytest.raises( + NotImplementedError, + match="The index/columns must be unique when raw=False and engine='numba'", + ): + df.apply(f, engine="numba", axis=apply_axis) + + +def test_numba_unsupported_dtypes(apply_axis): + f = lambda x: x + df = DataFrame({"a": [1, 2], "b": ["a", "b"], "c": [4, 5]}) + df["c"] = df["c"].astype("double[pyarrow]") + + with pytest.raises( + ValueError, match="Column b must have a numeric dtype. Found 'object' instead" + ): + df.apply(f, engine="numba", axis=apply_axis) + + with pytest.raises( + ValueError, + match="Column c is backed by an extension array, " + "which is not supported by the numba engine.", + ): + df["c"].to_frame().apply(f, engine="numba", axis=apply_axis) diff --git a/pyproject.toml b/pyproject.toml index a5aaa72289209..651e82450f201 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -737,7 +737,7 @@ pythonVersion = "3.11" typeCheckingMode = "basic" useLibraryCodeForTypes = false include = ["pandas", "typings"] -exclude = ["pandas/tests", "pandas/io/clipboard", "pandas/util/version"] +exclude = ["pandas/tests", "pandas/io/clipboard", "pandas/util/version", "pandas/core/_numba/extensions.py"] # enable subset of "strict" reportDuplicateImport = true reportInconsistentConstructor = true diff --git a/pyright_reportGeneralTypeIssues.json b/pyright_reportGeneralTypeIssues.json index cad43632930ba..c059b9c589ecd 100644 --- a/pyright_reportGeneralTypeIssues.json +++ b/pyright_reportGeneralTypeIssues.json @@ -16,6 +16,7 @@ "pandas/_testing/__init__.py", "pandas/_testing/_io.py", + "pandas/core/_numba/extensions.py", "pandas/core/_numba/kernels/sum_.py", "pandas/core/_numba/kernels/var_.py", "pandas/compat/pickle_compat.py", diff --git a/scripts/validate_unwanted_patterns.py b/scripts/validate_unwanted_patterns.py index d765d7bc7dcb9..6e6251425928d 100755 --- a/scripts/validate_unwanted_patterns.py +++ b/scripts/validate_unwanted_patterns.py @@ -51,6 +51,8 @@ "_chained_assignment_msg", "_chained_assignment_method_msg", "_version_meson", + # The numba extensions need this to mock the iloc object + "_iLocIndexer", # TODO(3.0): GH#55043 - remove upon removal of ArrayManager "_get_option", }