diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 2514d9dc31c..75228581fb0 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -44,7 +44,13 @@ def register(cls, func, join_type="outer", labels="replace"): """ def caller( - query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs + query_compiler, + other, + broadcast=False, + *args, + dtypes=None, + copy_dtypes=False, + **kwargs ): """ Apply binary `func` to passed operands. @@ -61,8 +67,14 @@ def caller( at the query compiler level, so this parameter is a hint that passed from a high level API. *args : args, Arguments that will be passed to `func`. - dtypes : "copy" or None, default: None - Whether to keep old dtypes or infer new dtypes from data. + dtypes : pandas.Series or scalar type, optional + The data types for the result. This is an optimization + because there are functions that always result in a particular data + type, and this allows us to avoid (re)computing it. + If the argument is a scalar type, then that type is assigned to each result column. + copy_dtypes : bool, default False + If True, the dtypes of the resulting dataframe are copied from the original, + and the ``dtypes`` argument is ignored. **kwargs : kwargs, Arguments that will be passed to `func`. @@ -76,7 +88,7 @@ def caller( if broadcast: assert ( len(other.columns) == 1 - ), "Invalid broadcast argument for `broadcast_apply`, too many columns: {}".format( + ), "Invalid broadcast argument for `map` with broadcast, too many columns: {}".format( len(other.columns) ) # Transpose on `axis=1` because we always represent an individual @@ -84,13 +96,14 @@ def caller( if axis == 1: other = other.transpose() return query_compiler.__constructor__( - query_compiler._modin_frame.broadcast_apply( - axis, + query_compiler._modin_frame.map( lambda l, r: func(l, r.squeeze(), *args, **kwargs), - other._modin_frame, + axis=axis, + other=other._modin_frame, join_type=join_type, labels=labels, dtypes=dtypes, + copy_dtypes=copy_dtypes, ) ) else: @@ -103,17 +116,20 @@ def caller( ) else: if isinstance(other, (list, np.ndarray, pandas.Series)): - new_modin_frame = query_compiler._modin_frame.apply_full_axis( - axis, + new_modin_frame = query_compiler._modin_frame.map( lambda df: func(df, other, *args, **kwargs), + axis=axis, + full_axis=True, new_index=query_compiler.index, new_columns=query_compiler.columns, dtypes=dtypes, + copy_dtypes=copy_dtypes, ) else: new_modin_frame = query_compiler._modin_frame.map( lambda df: func(df, other, *args, **kwargs), dtypes=dtypes, + copy_dtypes=copy_dtypes, ) return query_compiler.__constructor__(new_modin_frame) diff --git a/modin/core/dataframe/base/dataframe/dataframe.py b/modin/core/dataframe/base/dataframe/dataframe.py index 536d43ceaea..8fdfde282a7 100644 --- a/modin/core/dataframe/base/dataframe/dataframe.py +++ b/modin/core/dataframe/base/dataframe/dataframe.py @@ -18,9 +18,20 @@ """ from abc import ABC, abstractmethod -from typing import List, Hashable, Optional, Callable, Union, Dict +from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING + +import pandas + from modin.core.dataframe.base.dataframe.utils import Axis, JoinType +if TYPE_CHECKING: + from modin.core.dataframe.pandas.partitioning import ( + PandasDataframePartition, + PandasDataframeAxisPartition, + ) + + Partition = Union[PandasDataframeAxisPartition, PandasDataframePartition] + class ModinDataframe(ABC): """ @@ -91,8 +102,10 @@ def filter_by_types(self, types: List[Hashable]) -> "ModinDataframe": def map( self, function: Callable, + *, axis: Optional[Union[int, Axis]] = None, - dtypes: Optional[str] = None, + dtypes: Optional[Union[pandas.Series, type]] = None, + copy_dtypes: bool = False, ) -> "ModinDataframe": """ Apply a user-defined function row-wise if `axis`=0, column-wise if `axis`=1, and cell-wise if `axis` is None. @@ -102,11 +115,14 @@ def map( function : callable(row|col|cell) -> row|col|cell The function to map across the dataframe. axis : int or modin.core.dataframe.base.utils.Axis, optional - The axis to map over. - dtypes : str, optional + The axis to map over. If None, the map will be performed element-wise. + dtypes : pandas.Series or scalar type, optional The data types for the result. This is an optimization because there are functions that always result in a particular data type, and this allows us to avoid (re)computing it. + copy_dtypes : bool, default: False + If True, the dtypes of the resulting dataframe are copied from the original, + and the ``dtypes`` argument is ignored. Returns ------- @@ -257,8 +273,9 @@ def groupby( def reduce( self, axis: Union[int, Axis], - function: Callable, - dtypes: Optional[str] = None, + function: Callable[["Partition"], object], + *, + dtypes: Optional[pandas.Series] = None, ) -> "ModinDataframe": """ Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton. @@ -269,7 +286,7 @@ def reduce( The axis to perform the reduce over. function : callable(row|col) -> single value The reduce function to apply to each column. - dtypes : str, optional + dtypes : pandas.Series, optional The data types for the result. This is an optimization because there are functions that always result in a particular data type, and this allows us to avoid (re)computing it. @@ -289,9 +306,10 @@ def reduce( def tree_reduce( self, axis: Union[int, Axis], - map_func: Callable, - reduce_func: Optional[Callable] = None, - dtypes: Optional[str] = None, + map_func: Callable[["Partition"], "Partition"], + reduce_func: Optional[Callable[["Partition"], object]] = None, + *, + dtypes: Optional[pandas.Series] = None, ) -> "ModinDataframe": """ Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton using a tree-reduce computation pattern. diff --git a/modin/core/dataframe/base/dataframe/utils.py b/modin/core/dataframe/base/dataframe/utils.py index c7181490abb..38e0b8f41d1 100644 --- a/modin/core/dataframe/base/dataframe/utils.py +++ b/modin/core/dataframe/base/dataframe/utils.py @@ -19,6 +19,12 @@ """ from enum import Enum +import sys + +if sys.version_info.minor < 8: + from typing_extensions import Literal +else: + from typing import Literal class Axis(Enum): # noqa: PR01 @@ -36,6 +42,10 @@ class Axis(Enum): # noqa: PR01 CELL_WISE = None +AxisInt = Literal[0, 1] +"""Type for the two possible integer values of an axis argument (0 or 1).""" + + class JoinType(Enum): # noqa: PR01 """ An enum that represents the `join_type` argument provided to the algebra operators. diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 5b60d54191c..80c19a29deb 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -25,7 +25,15 @@ from pandas.core.indexes.api import ensure_index, Index, RangeIndex from pandas.core.dtypes.common import is_numeric_dtype, is_list_like from pandas._libs.lib import no_default -from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING +from typing import ( + List, + Hashable, + Optional, + Callable, + Union, + Dict, + TYPE_CHECKING, +) from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler from modin.error_message import ErrorMessage @@ -35,6 +43,7 @@ from modin.core.dataframe.base.dataframe.dataframe import ModinDataframe from modin.core.dataframe.base.dataframe.utils import ( Axis, + AxisInt, JoinType, ) @@ -43,6 +52,12 @@ ProtocolDataframe, ) from pandas._typing import npt + from modin.core.dataframe.pandas.partitioning import ( + PandasDataframePartition, + PandasDataframeAxisPartition, + ) + + Partition = Union[PandasDataframeAxisPartition, PandasDataframePartition] from modin.pandas.indexing import is_range_like from modin.pandas.utils import is_full_grab_slice, check_both_not_none @@ -1002,11 +1017,14 @@ def from_labels_executor(df, **kwargs): result.index = df.index return result - new_parts = self._partition_mgr_cls.apply_func_to_select_indices( + new_parts = self._partition_mgr_cls.map_select_indices( + from_labels_executor, 0, self._partitions, - from_labels_executor, + None, + False, [0], + None, keep_remaining=True, ) new_column_widths = [ @@ -1070,16 +1088,16 @@ def _reorder_labels(self, row_positions=None, col_positions=None): A new PandasDataframe with reordered columns and/or rows. """ if row_positions is not None: - ordered_rows = self._partition_mgr_cls.map_axis_partitions( - 0, self._partitions, lambda df: df.iloc[row_positions] + ordered_rows = self._partition_mgr_cls.map_partitions( + self._partitions, lambda df: df.iloc[row_positions], axis=0 ) row_idx = self.index[row_positions] else: ordered_rows = self._partitions row_idx = self.index if col_positions is not None: - ordered_cols = self._partition_mgr_cls.map_axis_partitions( - 1, ordered_rows, lambda df: df.iloc[:, col_positions] + ordered_cols = self._partition_mgr_cls.map_partitions( + ordered_rows, lambda df: df.iloc[:, col_positions], axis=1 ) col_idx = self.columns[col_positions] else: @@ -1532,7 +1550,9 @@ def _tree_reduce_func(df, *args, **kwargs): return _tree_reduce_func - def _compute_tree_reduce_metadata(self, axis, new_parts): + def _compute_tree_reduce_metadata( + self, axis, new_parts, dtypes: Optional[pandas.Series] = None + ): """ Compute the metadata for the result of reduce function. @@ -1542,6 +1562,8 @@ def _compute_tree_reduce_metadata(self, axis, new_parts): The axis on which reduce function was applied. new_parts : NumPy 2D array Partitions with the result of applied function. + dtypes : Optional[pandas.Series], default: None + The data types of the result. Returns ------- @@ -1556,12 +1578,11 @@ def _compute_tree_reduce_metadata(self, axis, new_parts): new_axes_lengths[axis] = [1] new_axes_lengths[axis ^ 1] = self._axes_lengths[axis ^ 1] - new_dtypes = None result = self.__constructor__( new_parts, *new_axes, *new_axes_lengths, - new_dtypes, + dtypes, ) return result @@ -1569,8 +1590,9 @@ def _compute_tree_reduce_metadata(self, axis, new_parts): def reduce( self, axis: Union[int, Axis], - function: Callable, - dtypes: Optional[str] = None, + function: Callable[["Partition"], object], + *, + dtypes: Optional[pandas.Series] = None, ) -> "PandasDataframe": """ Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton. Requires knowledge of the full axis for the reduction. @@ -1581,7 +1603,7 @@ def reduce( The axis to perform the reduce over. function : callable(row|col) -> single value The reduce function to apply to each column. - dtypes : str, optional + dtypes : pandas.Series, optional The data types for the result. This is an optimization because there are functions that always result in a particular data type, and this allows us to avoid (re)computing it. @@ -1597,18 +1619,19 @@ def reduce( """ axis = Axis(axis) function = self._build_treereduce_func(axis.value, function) - new_parts = self._partition_mgr_cls.map_axis_partitions( - axis.value, self._partitions, function + new_parts = self._partition_mgr_cls.map_partitions( + self._partitions, function, axis=axis.value ) - return self._compute_tree_reduce_metadata(axis.value, new_parts) + return self._compute_tree_reduce_metadata(axis.value, new_parts, dtypes) @lazy_metadata_decorator(apply_axis="opposite", axis_arg=0) def tree_reduce( self, axis: Union[int, Axis], - map_func: Callable, - reduce_func: Optional[Callable] = None, - dtypes: Optional[str] = None, + map_func: Callable[["Partition"], "Partition"], + reduce_func: Optional[Callable[["Partition"], object]] = None, + *, + dtypes: Optional[pandas.Series] = None, ) -> "PandasDataframe": """ Apply function that will reduce the data to a pandas Series. @@ -1622,7 +1645,7 @@ def tree_reduce( reduce_func : callable(row|col) -> single value, optional Callable function to reduce the dataframe. If none, then apply map_func twice. - dtypes : str, optional + dtypes : pandas.Series, optional The data types for the result. This is an optimization because there are functions that always result in a particular data type, and this allows us to avoid (re)computing it. @@ -1640,13 +1663,69 @@ def tree_reduce( reduce_func = self._build_treereduce_func(axis.value, reduce_func) map_parts = self._partition_mgr_cls.map_partitions(self._partitions, map_func) - reduce_parts = self._partition_mgr_cls.map_axis_partitions( - axis.value, map_parts, reduce_func + reduce_parts = self._partition_mgr_cls.map_partitions( + map_parts, reduce_func, axis=axis.value ) - return self._compute_tree_reduce_metadata(axis.value, reduce_parts) + return self._compute_tree_reduce_metadata(axis.value, reduce_parts, dtypes) - @lazy_metadata_decorator(apply_axis=None) - def map(self, func: Callable, dtypes: Optional[str] = None) -> "PandasDataframe": + def _make_init_labels_args(self, partitions, index, columns) -> dict: # noqa: GL08 + kw = {} + kw["index"], kw["row_lengths"] = ( + self._compute_axis_labels_and_lengths(0, partitions) + if index is None + else (index, None) + ) + kw["columns"], kw["column_widths"] = ( + self._compute_axis_labels_and_lengths(1, partitions) + if columns is None + else (columns, None) + ) + return kw + + def _prepare_frame_to_broadcast(self, axis, indices): + """ + Compute the indices to broadcast `self` considering `indices`. + + Parameters + ---------- + axis : {0, 1} + Axis to broadcast along. + indices : dict + Dict of indices and internal indices of partitions where `self` must + be broadcasted. + + Returns + ------- + dict + Dictionary with indices of partitions to broadcast. + + Notes + ----- + New dictionary of indices of `self` partitions represents that + you want to broadcast `self` at specified another partition named `other`. For example, + Dictionary {key: {key1: [0, 1], key2: [5]}} means, that in `other`[key] you want to + broadcast [self[key1], self[key2]] partitions and internal indices for `self` must be [[0, 1], [5]] + """ + sizes = self.row_lengths if axis else self.column_widths + return {key: dict(enumerate(sizes)) for key in indices.keys()} + + def map( + self, + func: Callable, + *, + axis: Optional[Union[AxisInt, Axis]] = None, + other: Optional["PandasDataframe"] = None, + full_axis=False, + join_type="left", + labels="keep", + new_index=None, + new_columns=None, + apply_indices=None, + numeric_indices=None, + enumerate_partitions=False, + dtypes: Optional[Union[pandas.Series, type]] = None, + copy_dtypes: bool = False, + ) -> "PandasDataframe": """ Perform a function that maps across the entire dataset. @@ -1654,23 +1733,107 @@ def map(self, func: Callable, dtypes: Optional[str] = None) -> "PandasDataframe" ---------- func : callable(row|col|cell) -> row|col|cell The function to apply. - dtypes : dtypes of the result, optional + axis : int or modin.core.dataframe.base.utils.Axis, optional + The axis to map over. If None, the map will be performed element-wise. + other : PandasDataFrame, optional + Modin dataframe to broadcast. + full_axis : bool, default: False + Whether to apply the function to a virtual partition that encompasses the whole axis. + If the virtual partitions of this dataframe and ``other`` do not encompass the whole axis, + the partitions will be concatenated together before the function is called, and then re-split + after it returns. + join_type : str, default: "left" + Type of join to apply. + labels : {"keep", "replace", "drop"}, default: "keep" + Whether keep labels from `self` Modin DataFrame, replace them with labels + from joined DataFrame or drop altogether to make them be computed lazily later. + new_index : list-like, optional + Index of the result. We may know this in advance, and if not provided it must be computed. + WARNING: This is currently only used when ``full_axis`` is True. + new_columns : list-like, optional + Columns of the result. We may know this in advance, and if not provided it must be computed. + WARNING: This is currently only used when ``full_axis`` is True. + apply_indices : list-like, optional + Labels of `axis ^ 1` to apply ``func`` over. Mutually exclusive from ``numeric_indices``. + WARNING: This is currently only used when ``full_axis`` is True. + numeric_indices : list-like, optional + Numeric indices to apply ``func`` over. Mutually exclusive from ``apply_indices``. + enumerate_partitions : bool, default: False + Whether pass partition index into applied `func` or not. + Note that `func` must be able to obtain `partition_idx` kwarg. + WARNING: This is currently only used when ``full_axis`` is True. + dtypes : pandas.Series or scalar type, optional The data types for the result. This is an optimization because there are functions that always result in a particular data type, and this allows us to avoid (re)computing it. + If the argument is a scalar type, then that type is assigned to each result column. + copy_dtypes : bool, default: False + If True, the dtypes of the resulting dataframe are copied from the original, + and the ``dtypes`` argument is ignored. Returns ------- PandasDataframe A new dataframe. """ - new_partitions = self._partition_mgr_cls.map_partitions(self._partitions, func) - if dtypes == "copy": + # Fix for issue #5094 + if axis == "columns": + axis = 1 + elif axis == "index": + axis = 0 + axis = Axis(axis) + if copy_dtypes: dtypes = self._dtypes - elif dtypes is not None: + elif isinstance(dtypes, type) or ( + isinstance(dtypes, np.dtype) and dtypes.ndim == 0 + ): + # Scalar native type or np dtype was specified, so duplicate it across columns dtypes = pandas.Series( [np.dtype(dtypes)] * len(self.columns), index=self.columns ) + if axis == Axis.CELL_WISE: + return self._map_cellwise(func, dtypes) + else: + return self._map_axis( + func, + axis.value, + other, + full_axis, + join_type, + labels, + new_index, + new_columns, + apply_indices, + enumerate_partitions, + dtypes, + copy_dtypes, + ) + + @lazy_metadata_decorator(apply_axis=None) + def _map_cellwise( + self, + func: Callable, + dtypes: Optional[Union[pandas.Series, type]] = None, + ) -> "PandasDataframe": + """ + Perform a cell-wise map across the dataframe. + + Parameters + ---------- + func : callable(row|col|cell) -> row|col|cell + The function to apply. + dtypes : pandas.Series or scalar type, optional + The data types for the result. This is an optimization + because there are functions that always result in a particular data + type, and this allows us to avoid (re)computing it. + If the argument is a scalar type, then that type is assigned to each result column. + + Returns + ------- + PandasDataframe + New Modin dataframe. + """ + new_partitions = self._partition_mgr_cls.map_partitions(self._partitions, func) return self.__constructor__( new_partitions, self._index_cache, @@ -1680,6 +1843,334 @@ def map(self, func: Callable, dtypes: Optional[str] = None) -> "PandasDataframe" dtypes=dtypes, ) + @lazy_metadata_decorator(apply_axis="both") + def _map_axis( + self, + func: Callable, + axis: AxisInt, + other, + full_axis, + join_type="left", + labels="keep", + new_index=None, + new_columns=None, + apply_indices=None, + enumerate_partitions=False, + dtypes=None, + copy_dtypes=False, + ) -> "PandasDataframe": + """ + Broadcast axis partitions of `other` to partitions of `self` and apply a function. + + Parameters + ---------- + func : callable + Function to apply. + axis : {0, 1} + Axis to broadcast over. + other : PandasDataframe + Modin DataFrame to broadcast. + full_axis : bool, default: False + Whether to apply the function to a virtual partition that encompasses the whole axis. + If the virtual partitions of this dataframe and ``other`` do not encompass the whole axis, + the partitions will be concatenated together before the function is called, and then re-split + after it returns. + join_type : str, default: "left" + Type of join to apply. + labels : {"keep", "replace", "drop"}, default: "keep" + Whether keep labels from `self` Modin DataFrame, replace them with labels + from joined DataFrame or drop altogether to make them be computed lazily later. + new_index : list-like, optional + Index of the result. We may know this in advance, and if not provided it must be computed. + WARNING: This is currently only used when ``full_axis`` is True. + new_columns : list-like, optional + Columns of the result. We may know this in advance, and if not provided it must be computed. + WARNING: This is currently only used when ``full_axis`` is True. + apply_indices : list-like, optional + Indices of `axis ^ 1` to apply function over. + WARNING: This is currently only used when ``full_axis`` is True. + enumerate_partitions : bool, default: False + Whether pass partition index into applied `func` or not. + Note that `func` must be able to obtain `partition_idx` kwarg. + WARNING: This is currently only used when ``full_axis`` is True. + dtypes : Optional[pandas.Series], default: None + Data types of the result. This is an optimization + because there are functions that always result in a particular data + type, and allows us to avoid (re)computing it. + copy_dtypes : bool, default: False + If True, the dtypes of the resulting dataframe are copied from the original, + and the ``dtypes`` argument is ignored. + + Returns + ------- + PandasDataframe + New Modin DataFrame. + """ + if full_axis: + if other is not None: + if not isinstance(other, list): + other = [other] + other = [o._partitions for o in other] if len(other) else None + + if apply_indices is not None: + numeric_indices = self.axes[axis ^ 1].get_indexer_for(apply_indices) + apply_indices = self._get_dict_of_block_index( + axis ^ 1, numeric_indices + ).keys() + + new_partitions = self._partition_mgr_cls.map_partitions( + self._partitions, + self._build_treereduce_func(axis, func), + axis=axis, + other_partitions=other, + keep_partitioning=True, + apply_indices=apply_indices, + enumerate_partitions=enumerate_partitions, + ) + # Index objects for new object creation. This is shorter than if..else + kw = self._make_init_labels_args(new_partitions, new_index, new_columns) + if copy_dtypes: + kw["dtypes"] = self._dtypes + elif isinstance(dtypes, type): + kw["dtypes"] = pandas.Series( + [np.dtype(dtypes)] * len(kw["columns"]), index=kw["columns"] + ) + result = self.__constructor__(new_partitions, **kw) + if new_index is not None: + result.synchronize_labels(axis=0) + if new_columns is not None: + result.synchronize_labels(axis=1) + return result + else: + if copy_dtypes: + dtypes = self._dtypes + if other is None: + new_partitions = self._partition_mgr_cls.map_partitions( + self._partitions, func + ) + return self.__constructor__( + new_partitions, + self.axes[0], + self.axes[1], + self._row_lengths_cache, + self._column_widths_cache, + dtypes=dtypes, + ) + else: + # Only sort the indices if they do not match + ( + left_parts, + right_parts, + joined_index, + partition_sizes_along_axis, + ) = self._copartition( + axis, + other, + join_type, + sort=not self.axes[axis].equals(other.axes[axis]), + ) + # unwrap list returned by `copartition`. + right_parts = right_parts[0] + new_frame = self._partition_mgr_cls.map_partitions( + left_parts, func, axis=axis, other_partitions=right_parts + ) + + def _pick_axis(get_axis, sizes_cache): + if labels == "keep": + return get_axis(), sizes_cache + if labels == "replace": + return joined_index, partition_sizes_along_axis + assert labels == "drop", f"Unexpected `labels`: {labels}" + return None, None + + if axis == 0: + # Pass shape caches instead of values in order to not trigger shape computation. + new_index, new_row_lengths = _pick_axis( + self._get_index, self._row_lengths_cache + ) + new_columns, new_column_widths = ( + self.columns, + self._column_widths_cache, + ) + else: + new_index, new_row_lengths = self.index, self._row_lengths_cache + new_columns, new_column_widths = _pick_axis( + self._get_columns, self._column_widths_cache + ) + + return self.__constructor__( + new_frame, + new_index, + new_columns, + new_row_lengths, + new_column_widths, + dtypes=dtypes, + ) + + @lazy_metadata_decorator(apply_axis="both") + def map_select_indices( + self, + func: Callable, + *, + axis: AxisInt, + other=None, + full_axis=False, + apply_indices=None, + numeric_indices=None, + new_index=None, + new_columns=None, + keep_remaining=False, + ) -> "PandasDataframe": + """ + Apply a function over a subset of indices in the dataframe. + + Parameters + ---------- + func : callable + Function to apply. + axis : {0, 1} + Axis to broadcast over. + other : PandasDataframe, optional + Modin DataFrame to broadcast. + full_axis : bool, default: False + Whether to apply the function to a virtual partition that encompasses the whole axis. + If the virtual partitions of this dataframe and ``other`` do not encompass the whole axis, + the partitions will be concatenated together before the function is called, and then re-split + after it returns. + apply_indices : list-like, optional + Labels of `axis ^ 1` to apply over (if `numeric_indices` is not specified). + numeric_indices : list-like, optional + Numeric indices to apply over (if `apply_indices` is not specified). + new_index : list-like, optional + Index of the result. We may know this in advance, and if not provided it must be computed. + new_columns : list-like, optional + Columns of the result. We may know this in advance, and if not provided it must be computed. + keep_remaining : bool, default: False + Whether drop the data that is not computed over or not. + + Returns + ------- + PandasDataframe + New Modin DataFrame. + """ + assert apply_indices is not None or numeric_indices is not None + # Fix for issue #5094 + if axis == "columns": + axis = 1 + elif axis == "index": + axis = 0 + # Convert indices to numeric indices + old_index = self.index if axis else self.columns + if apply_indices is not None: + numeric_indices = old_index.get_indexer_for(apply_indices) + # Get the indices for the axis being applied to (it is the opposite of axis + # being applied over) + dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices) + other_indices = ( + None + if other is None + else other._prepare_frame_to_broadcast(axis, dict_indices) + ) + new_partitions = self._partition_mgr_cls.map_select_indices( + func, + axis, + self._partitions, + None if other is None else other._partitions, + full_axis, + apply_indices=dict_indices, + other_apply_indices=other_indices, + keep_remaining=keep_remaining, + ) + # TODO Infer columns and index from `keep_remaining` and `apply_indices` + if new_index is None: + new_index = self.index if axis == 1 else None + if new_columns is None: + new_columns = self.columns if axis == 0 else None + if other is None and not full_axis: + # In non-broadcast full axis operations, the dimensions of the result can be obtained + # from the kept partitions + # + # Length objects for new object creation. This is shorter than if..else + # This object determines the lengths and widths based on the given + # parameters and builds a dictionary used in the constructor below. 0 gives + # the row lengths and 1 gives the column widths. Since the dimension of + # `axis` given may have changed, we currently just recompute it. + # TODO Determine lengths from current lengths if `keep_remaining=False` + lengths = { + axis: [len(apply_indices)] + if not keep_remaining + else [self.row_lengths, self.column_widths][axis], + axis ^ 1: [self.row_lengths, self.column_widths][axis ^ 1], + } + new_row_lengths = lengths[0] + new_col_widths = lengths[1] + else: + new_row_lengths = None + new_col_widths = None + return self.__constructor__( + new_partitions, new_index, new_columns, new_row_lengths, new_col_widths + ) + + @lazy_metadata_decorator(apply_axis="both") + def map_select_indices_both_axes( + self, + func, + row_labels, + col_labels, + new_index, + new_columns, + item_to_distribute=no_default, + ): + """ + Apply a function for a subset of the data identified by both row and column labels. + + Parameters + ---------- + func : callable + The function to apply. + row_labels : list-like, default: None + The row labels to apply over. Must be provided with + `col_labels` to apply over both axes. + col_labels : list-like, default: None + The column labels to apply over. Must be provided + with `row_labels` to apply over both axes. + new_index : list-like + The index of the result. + new_columns : list-like + The columns of the result. + item_to_distribute : np.ndarray or scalar, default: no_default + The item to split up so it can be applied over both axes. + + Returns + ------- + PandasDataframe + A new dataframe. + """ + # We are applying over both axes here, so make sure we have all the right + # variables set. + assert row_labels is not None and col_labels is not None + assert item_to_distribute is not no_default + row_partitions_list = self._get_dict_of_block_index(0, row_labels).items() + col_partitions_list = self._get_dict_of_block_index(1, col_labels).items() + new_partitions = self._partition_mgr_cls.map_select_indices_both_axes( + self._partitions, + func, + row_partitions_list, + col_partitions_list, + item_to_distribute, + # Passing caches instead of values in order to not trigger shapes recomputation + # if they are not used inside this function. + self._row_lengths_cache, + self._column_widths_cache, + ) + return self.__constructor__( + new_partitions, + new_index, + new_columns, + self._row_lengths_cache, + self._column_widths_cache, + ) + def window( self, axis: Union[int, Axis], @@ -1736,8 +2227,8 @@ def fold(self, axis, func): ----- The data shape is not changed (length and width of the table). """ - new_partitions = self._partition_mgr_cls.map_axis_partitions( - axis, self._partitions, func, keep_partitioning=True + new_partitions = self._partition_mgr_cls.map_partitions( + self._partitions, func, axis=axis, keep_partitioning=True ) return self.__constructor__( new_partitions, @@ -1948,8 +2439,8 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> "PandasDatafram Axis.COL_WISE, ), "Axis argument to filter operator must be 0 (rows) or 1 (columns)" - new_partitions = self._partition_mgr_cls.map_axis_partitions( - axis.value, self._partitions, condition, keep_partitioning=True + new_partitions = self._partition_mgr_cls.map_partitions( + self._partitions, condition, axis=axis.value, keep_partitioning=True ) new_axes, new_lengths = [0, 0], [0, 0] @@ -2002,8 +2493,8 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe": A new filtered dataframe. """ axis = Axis(axis) - partitions = self._partition_mgr_cls.map_axis_partitions( - axis.value, self._partitions, func, keep_partitioning=True + partitions = self._partition_mgr_cls.map_partitions( + self._partitions, func, axis=axis.value, keep_partitioning=True ) if axis == Axis.COL_WISE: new_index, row_lengths = self._compute_axis_labels_and_lengths( @@ -2019,509 +2510,6 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe": partitions, new_index, new_columns, row_lengths, column_widths ) - @lazy_metadata_decorator(apply_axis="both") - def apply_full_axis( - self, - axis, - func, - new_index=None, - new_columns=None, - dtypes=None, - ): - """ - Perform a function across an entire axis. - - Parameters - ---------- - axis : {0, 1} - The axis to apply over (0 - rows, 1 - columns). - func : callable - The function to apply. - new_index : list-like, optional - The index of the result. We may know this in advance, - and if not provided it must be computed. - new_columns : list-like, optional - The columns of the result. We may know this in - advance, and if not provided it must be computed. - dtypes : list-like, optional - The data types of the result. This is an optimization - because there are functions that always result in a particular data - type, and allows us to avoid (re)computing it. - - Returns - ------- - PandasDataframe - A new dataframe. - - Notes - ----- - The data shape may change as a result of the function. - """ - return self.broadcast_apply_full_axis( - axis=axis, - func=func, - new_index=new_index, - new_columns=new_columns, - dtypes=dtypes, - other=None, - ) - - @lazy_metadata_decorator(apply_axis="both") - def apply_full_axis_select_indices( - self, - axis, - func, - apply_indices=None, - numeric_indices=None, - new_index=None, - new_columns=None, - keep_remaining=False, - ): - """ - Apply a function across an entire axis for a subset of the data. - - Parameters - ---------- - axis : int - The axis to apply over. - func : callable - The function to apply. - apply_indices : list-like, default: None - The labels to apply over. - numeric_indices : list-like, default: None - The indices to apply over. - new_index : list-like, optional - The index of the result. We may know this in advance, - and if not provided it must be computed. - new_columns : list-like, optional - The columns of the result. We may know this in - advance, and if not provided it must be computed. - keep_remaining : boolean, default: False - Whether or not to drop the data that is not computed over. - - Returns - ------- - PandasDataframe - A new dataframe. - """ - assert apply_indices is not None or numeric_indices is not None - # Convert indices to numeric indices - old_index = self.index if axis else self.columns - if apply_indices is not None: - numeric_indices = old_index.get_indexer_for(apply_indices) - # Get the indices for the axis being applied to (it is the opposite of axis - # being applied over) - dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices) - new_partitions = ( - self._partition_mgr_cls.apply_func_to_select_indices_along_full_axis( - axis, - self._partitions, - func, - dict_indices, - keep_remaining=keep_remaining, - ) - ) - # TODO Infer columns and index from `keep_remaining` and `apply_indices` - if new_index is None: - new_index = self.index if axis == 1 else None - if new_columns is None: - new_columns = self.columns if axis == 0 else None - return self.__constructor__(new_partitions, new_index, new_columns, None, None) - - @lazy_metadata_decorator(apply_axis="both") - def apply_select_indices( - self, - axis, - func, - apply_indices=None, - row_labels=None, - col_labels=None, - new_index=None, - new_columns=None, - keep_remaining=False, - item_to_distribute=no_default, - ): - """ - Apply a function for a subset of the data. - - Parameters - ---------- - axis : {0, 1} - The axis to apply over. - func : callable - The function to apply. - apply_indices : list-like, default: None - The labels to apply over. Must be given if axis is provided. - row_labels : list-like, default: None - The row labels to apply over. Must be provided with - `col_labels` to apply over both axes. - col_labels : list-like, default: None - The column labels to apply over. Must be provided - with `row_labels` to apply over both axes. - new_index : list-like, optional - The index of the result. We may know this in advance, - and if not provided it must be computed. - new_columns : list-like, optional - The columns of the result. We may know this in - advance, and if not provided it must be computed. - keep_remaining : boolean, default: False - Whether or not to drop the data that is not computed over. - item_to_distribute : np.ndarray or scalar, default: no_default - The item to split up so it can be applied over both axes. - - Returns - ------- - PandasDataframe - A new dataframe. - """ - # TODO Infer columns and index from `keep_remaining` and `apply_indices` - if new_index is None: - new_index = self.index if axis == 1 else None - if new_columns is None: - new_columns = self.columns if axis == 0 else None - if axis is not None: - assert apply_indices is not None - # Convert indices to numeric indices - old_index = self.index if axis else self.columns - numeric_indices = old_index.get_indexer_for(apply_indices) - # Get indices being applied to (opposite of indices being applied over) - dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices) - new_partitions = self._partition_mgr_cls.apply_func_to_select_indices( - axis, - self._partitions, - func, - dict_indices, - keep_remaining=keep_remaining, - ) - # Length objects for new object creation. This is shorter than if..else - # This object determines the lengths and widths based on the given - # parameters and builds a dictionary used in the constructor below. 0 gives - # the row lengths and 1 gives the column widths. Since the dimension of - # `axis` given may have changed, we currently just recompute it. - # TODO Determine lengths from current lengths if `keep_remaining=False` - lengths_objs = { - axis: [len(apply_indices)] - if not keep_remaining - else [self.row_lengths, self.column_widths][axis], - axis ^ 1: [self.row_lengths, self.column_widths][axis ^ 1], - } - return self.__constructor__( - new_partitions, new_index, new_columns, lengths_objs[0], lengths_objs[1] - ) - else: - # We are applying over both axes here, so make sure we have all the right - # variables set. - assert row_labels is not None and col_labels is not None - assert keep_remaining - assert item_to_distribute is not no_default - row_partitions_list = self._get_dict_of_block_index(0, row_labels).items() - col_partitions_list = self._get_dict_of_block_index(1, col_labels).items() - new_partitions = self._partition_mgr_cls.apply_func_to_indices_both_axis( - self._partitions, - func, - row_partitions_list, - col_partitions_list, - item_to_distribute, - # Passing caches instead of values in order to not trigger shapes recomputation - # if they are not used inside this function. - self._row_lengths_cache, - self._column_widths_cache, - ) - return self.__constructor__( - new_partitions, - new_index, - new_columns, - self._row_lengths_cache, - self._column_widths_cache, - ) - - @lazy_metadata_decorator(apply_axis="both") - def broadcast_apply( - self, axis, func, other, join_type="left", labels="keep", dtypes=None - ): - """ - Broadcast axis partitions of `other` to partitions of `self` and apply a function. - - Parameters - ---------- - axis : {0, 1} - Axis to broadcast over. - func : callable - Function to apply. - other : PandasDataframe - Modin DataFrame to broadcast. - join_type : str, default: "left" - Type of join to apply. - labels : {"keep", "replace", "drop"}, default: "keep" - Whether keep labels from `self` Modin DataFrame, replace them with labels - from joined DataFrame or drop altogether to make them be computed lazily later. - dtypes : "copy" or None, default: None - Whether keep old dtypes or infer new dtypes from data. - - Returns - ------- - PandasDataframe - New Modin DataFrame. - """ - # Only sort the indices if they do not match - ( - left_parts, - right_parts, - joined_index, - partition_sizes_along_axis, - ) = self._copartition( - axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis]) - ) - # unwrap list returned by `copartition`. - right_parts = right_parts[0] - new_frame = self._partition_mgr_cls.broadcast_apply( - axis, func, left_parts, right_parts - ) - if dtypes == "copy": - dtypes = self._dtypes - - def _pick_axis(get_axis, sizes_cache): - if labels == "keep": - return get_axis(), sizes_cache - if labels == "replace": - return joined_index, partition_sizes_along_axis - assert labels == "drop", f"Unexpected `labels`: {labels}" - return None, None - - if axis == 0: - # Pass shape caches instead of values in order to not trigger shape computation. - new_index, new_row_lengths = _pick_axis( - self._get_index, self._row_lengths_cache - ) - new_columns, new_column_widths = self.columns, self._column_widths_cache - else: - new_index, new_row_lengths = self.index, self._row_lengths_cache - new_columns, new_column_widths = _pick_axis( - self._get_columns, self._column_widths_cache - ) - - return self.__constructor__( - new_frame, - new_index, - new_columns, - new_row_lengths, - new_column_widths, - dtypes=dtypes, - ) - - def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all): - """ - Compute the indices to broadcast `self` considering `indices`. - - Parameters - ---------- - axis : {0, 1} - Axis to broadcast along. - indices : dict - Dict of indices and internal indices of partitions where `self` must - be broadcasted. - broadcast_all : bool - Whether broadcast the whole axis of `self` frame or just a subset of it. - - Returns - ------- - dict - Dictionary with indices of partitions to broadcast. - - Notes - ----- - New dictionary of indices of `self` partitions represents that - you want to broadcast `self` at specified another partition named `other`. For example, - Dictionary {key: {key1: [0, 1], key2: [5]}} means, that in `other`[key] you want to - broadcast [self[key1], self[key2]] partitions and internal indices for `self` must be [[0, 1], [5]] - """ - if broadcast_all: - sizes = self.row_lengths if axis else self.column_widths - return {key: dict(enumerate(sizes)) for key in indices.keys()} - passed_len = 0 - result_dict = {} - for part_num, internal in indices.items(): - result_dict[part_num] = self._get_dict_of_block_index( - axis ^ 1, np.arange(passed_len, passed_len + len(internal)) - ) - passed_len += len(internal) - return result_dict - - def __make_init_labels_args(self, partitions, index, columns) -> dict: - kw = {} - kw["index"], kw["row_lengths"] = ( - self._compute_axis_labels_and_lengths(0, partitions) - if index is None - else (index, None) - ) - kw["columns"], kw["column_widths"] = ( - self._compute_axis_labels_and_lengths(1, partitions) - if columns is None - else (columns, None) - ) - return kw - - @lazy_metadata_decorator(apply_axis="both") - def broadcast_apply_select_indices( - self, - axis, - func, - other, - apply_indices=None, - numeric_indices=None, - keep_remaining=False, - broadcast_all=True, - new_index=None, - new_columns=None, - ): - """ - Apply a function to select indices at specified axis and broadcast partitions of `other` Modin DataFrame. - - Parameters - ---------- - axis : {0, 1} - Axis to apply function along. - func : callable - Function to apply. - other : PandasDataframe - Partitions of which should be broadcasted. - apply_indices : list, default: None - List of labels to apply (if `numeric_indices` are not specified). - numeric_indices : list, default: None - Numeric indices to apply (if `apply_indices` are not specified). - keep_remaining : bool, default: False - Whether drop the data that is not computed over or not. - broadcast_all : bool, default: True - Whether broadcast the whole axis of right frame to every - partition or just a subset of it. - new_index : pandas.Index, optional - Index of the result. We may know this in advance, - and if not provided it must be computed. - new_columns : pandas.Index, optional - Columns of the result. We may know this in advance, - and if not provided it must be computed. - - Returns - ------- - PandasDataframe - New Modin DataFrame. - """ - assert ( - apply_indices is not None or numeric_indices is not None - ), "Indices to apply must be specified!" - - if other is None: - if apply_indices is None: - apply_indices = self.axes[axis][numeric_indices] - return self.apply_select_indices( - axis=axis, - func=func, - apply_indices=apply_indices, - keep_remaining=keep_remaining, - new_index=new_index, - new_columns=new_columns, - ) - - if numeric_indices is None: - old_index = self.index if axis else self.columns - numeric_indices = old_index.get_indexer_for(apply_indices) - - dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices) - broadcasted_dict = other._prepare_frame_to_broadcast( - axis, dict_indices, broadcast_all=broadcast_all - ) - new_partitions = self._partition_mgr_cls.broadcast_apply_select_indices( - axis, - func, - self._partitions, - other._partitions, - dict_indices, - broadcasted_dict, - keep_remaining, - ) - - kw = self.__make_init_labels_args(new_partitions, new_index, new_columns) - return self.__constructor__(new_partitions, **kw) - - @lazy_metadata_decorator(apply_axis="both") - def broadcast_apply_full_axis( - self, - axis, - func, - other, - new_index=None, - new_columns=None, - apply_indices=None, - enumerate_partitions=False, - dtypes=None, - ): - """ - Broadcast partitions of `other` Modin DataFrame and apply a function along full axis. - - Parameters - ---------- - axis : {0, 1} - Axis to apply over (0 - rows, 1 - columns). - func : callable - Function to apply. - other : PandasDataframe or list - Modin DataFrame(s) to broadcast. - new_index : list-like, optional - Index of the result. We may know this in advance, - and if not provided it must be computed. - new_columns : list-like, optional - Columns of the result. We may know this in - advance, and if not provided it must be computed. - apply_indices : list-like, default: None - Indices of `axis ^ 1` to apply function over. - enumerate_partitions : bool, default: False - Whether pass partition index into applied `func` or not. - Note that `func` must be able to obtain `partition_idx` kwarg. - dtypes : list-like, default: None - Data types of the result. This is an optimization - because there are functions that always result in a particular data - type, and allows us to avoid (re)computing it. - - Returns - ------- - PandasDataframe - New Modin DataFrame. - """ - if other is not None: - if not isinstance(other, list): - other = [other] - other = [o._partitions for o in other] if len(other) else None - - if apply_indices is not None: - numeric_indices = self.axes[axis ^ 1].get_indexer_for(apply_indices) - apply_indices = self._get_dict_of_block_index( - axis ^ 1, numeric_indices - ).keys() - - new_partitions = self._partition_mgr_cls.broadcast_axis_partitions( - axis=axis, - left=self._partitions, - right=other, - apply_func=self._build_treereduce_func(axis, func), - apply_indices=apply_indices, - enumerate_partitions=enumerate_partitions, - keep_partitioning=True, - ) - # Index objects for new object creation. This is shorter than if..else - kw = self.__make_init_labels_args(new_partitions, new_index, new_columns) - if dtypes == "copy": - kw["dtypes"] = self._dtypes - elif dtypes is not None: - kw["dtypes"] = pandas.Series( - [np.dtype(dtypes)] * len(kw["columns"]), index=kw["columns"] - ) - result = self.__constructor__(new_partitions, **kw) - if new_index is not None: - result.synchronize_labels(axis=0) - if new_columns is not None: - result.synchronize_labels(axis=1) - return result - def _copartition(self, axis, other, how, sort, force_repartition=False): """ Copartition two Modin DataFrames. @@ -2595,10 +2583,10 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): # Also define length of base and frames. We will need to know the # lengths for alignment. if do_repartition_base: - reindexed_base = base_frame._partition_mgr_cls.map_axis_partitions( - axis, + reindexed_base = base_frame._partition_mgr_cls.map_partitions( base_frame._partitions, make_reindexer(do_reindex_base, base_frame_idx), + axis=axis, ) if axis: base_lengths = [obj.width() for obj in reindexed_base[0]] @@ -2630,10 +2618,10 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): # indices of others frame start from `base_frame_idx` + 1 reindexed_other_list[i] = other_frames[ i - ]._partition_mgr_cls.map_axis_partitions( - axis, + ]._partition_mgr_cls.map_partitions( other_frames[i]._partitions, make_reindexer(do_repartition_others[i], base_frame_idx + 1 + i), + axis=axis, lengths=base_lengths, ) else: @@ -2708,7 +2696,7 @@ def n_ary_op(self, op, right_frames: list, join_type="outer"): @lazy_metadata_decorator(apply_axis="both") def concat( self, - axis: Union[int, Axis], + axis: Union[AxisInt, Axis], others: Union["PandasDataframe", List["PandasDataframe"]], how, sort, @@ -2929,7 +2917,7 @@ def groupby_reduce( new_partitions = self._partition_mgr_cls.groupby_reduce( axis, self._partitions, by_parts, map_func, reduce_func, apply_indices ) - kw = self.__make_init_labels_args(new_partitions, new_index, new_columns) + kw = self._make_init_labels_args(new_partitions, new_index, new_columns) return self.__constructor__(new_partitions, **kw) @classmethod diff --git a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py index 24ee461e2bf..8a933031f23 100644 --- a/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py +++ b/modin/core/dataframe/pandas/interchange/dataframe_protocol/column.py @@ -292,10 +292,10 @@ def get_chunks( new_lengths = [chunksize] * n_chunks new_lengths[-1] = n_rows % n_chunks + new_lengths[-1] - new_partitions = self._col._partition_mgr_cls.map_axis_partitions( - 0, + new_partitions = self._col._partition_mgr_cls.map_partitions( self._col._partitions, lambda df: df, + axis=0, keep_partitioning=False, lengths=new_lengths, ) diff --git a/modin/core/dataframe/pandas/interchange/dataframe_protocol/dataframe.py b/modin/core/dataframe/pandas/interchange/dataframe_protocol/dataframe.py index e330e78512b..52d7d410de6 100644 --- a/modin/core/dataframe/pandas/interchange/dataframe_protocol/dataframe.py +++ b/modin/core/dataframe/pandas/interchange/dataframe_protocol/dataframe.py @@ -176,10 +176,10 @@ def get_chunks( new_lengths = [chunksize] * n_chunks new_lengths[-1] = n_rows % n_chunks + new_lengths[-1] - new_partitions = self._df._partition_mgr_cls.map_axis_partitions( - 0, + new_partitions = self._df._partition_mgr_cls.map_partitions( self._df._partitions, lambda df: df, + axis=0, keep_partitioning=False, lengths=new_lengths, ) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 9003fc527b0..54c07c98fe7 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -19,6 +19,7 @@ from abc import ABC from functools import wraps +from typing import Optional, List import numpy as np import pandas from pandas._libs.lib import no_default @@ -26,6 +27,7 @@ from modin.error_message import ErrorMessage from modin.core.storage_formats.pandas.utils import compute_chunksize +from modin.core.dataframe.base.dataframe.utils import AxisInt from modin.core.dataframe.pandas.utils import concatenate from modin.config import NPartitions, ProgressBar, BenchmarkMode, Engine, StorageFormat from modin.logging import ClassLogger @@ -241,216 +243,126 @@ def groupby_reduce( ) if by is not None: - mapped_partitions = cls.broadcast_apply( - axis, map_func, left=partitions, right=by, other_name="other" - ) - else: - mapped_partitions = cls.map_partitions(partitions, map_func) - return cls.map_axis_partitions( - axis, mapped_partitions, reduce_func, enumerate_partitions=True - ) - - @classmethod - @wait_computations_if_benchmark_mode - def broadcast_apply_select_indices( - cls, - axis, - apply_func, - left, - right, - left_indices, - right_indices, - keep_remaining=False, - ): - """ - Broadcast the `right` partitions to `left` and apply `apply_func` to selected indices. - - Parameters - ---------- - axis : {0, 1} - Axis to apply and broadcast over. - apply_func : callable - Function to apply. - left : NumPy 2D array - Left partitions. - right : NumPy 2D array - Right partitions. - left_indices : list-like - Indices to apply function to. - right_indices : dictionary of indices of right partitions - Indices that you want to bring at specified left partition, for example - dict {key: {key1: [0, 1], key2: [5]}} means that in left[key] you want to - broadcast [right[key1], right[key2]] partitions and internal indices - for `right` must be [[0, 1], [5]]. - keep_remaining : bool, default: False - Whether or not to keep the other partitions. - Some operations may want to drop the remaining partitions and - keep only the results. - - Returns - ------- - NumPy array - An array of partition objects. - - Notes - ----- - Your internal function must take these kwargs: - [`internal_indices`, `other`, `internal_other_indices`] to work correctly! - """ - if not axis: - partitions_for_apply = left.T - right = right.T - else: - partitions_for_apply = left - - [obj.drain_call_queue() for row in right for obj in row] - - def get_partitions(index): - """Grab required partitions and indices from `right` and `right_indices`.""" - must_grab = right_indices[index] - partitions_list = np.array([right[i] for i in must_grab.keys()]) - indices_list = list(must_grab.values()) - return {"other": partitions_list, "internal_other_indices": indices_list} - new_partitions = np.array( - [ - partitions_for_apply[i] - if i not in left_indices - else cls._apply_func_to_list_of_partitions_broadcast( - apply_func, - partitions_for_apply[i], - internal_indices=left_indices[i], - **get_partitions(i), + def apply_func(df, *others): + other = ( + pandas.concat(others, axis=axis ^ 1) + if len(others) > 1 + else others[0] ) - for i in range(len(partitions_for_apply)) - if i in left_indices or keep_remaining - ] - ) - if not axis: - new_partitions = new_partitions.T - return new_partitions - - @classmethod - @wait_computations_if_benchmark_mode - def broadcast_apply(cls, axis, apply_func, left, right, other_name="r"): - """ - Broadcast the `right` partitions to `left` and apply `apply_func` function. - - Parameters - ---------- - axis : {0, 1} - Axis to apply and broadcast over. - apply_func : callable - Function to apply. - left : np.ndarray - NumPy array of left partitions. - right : np.ndarray - NumPy array of right partitions. - other_name : str, default: "r" - Name of key-value argument for `apply_func` that - is used to pass `right` to `apply_func`. - - Returns - ------- - np.ndarray - NumPy array of result partition objects. - - Notes - ----- - This will often be overridden by implementations. It materializes the - entire partitions of the right and applies them to the left through `apply`. - """ - - def map_func(df, *others): - other = ( - pandas.concat(others, axis=axis ^ 1) if len(others) > 1 else others[0] - ) - return apply_func(df, **{other_name: other}) + return map_func(df, other=other) - map_func = cls.preprocess_func(map_func) - rt_axis_parts = cls.axis_partition(right, axis ^ 1) - return np.array( - [ + apply_func = cls.preprocess_func(apply_func) + rt_axis_parts = cls.axis_partition(by, axis ^ 1) + mapped_partitions = np.array( [ - part.apply( - map_func, - *( - rt_axis_parts[col_idx].list_of_blocks - if axis - else rt_axis_parts[row_idx].list_of_blocks - ), - ) - for col_idx, part in enumerate(left[row_idx]) + [ + part.apply( + apply_func, + *( + rt_axis_parts[col_idx].list_of_blocks + if axis + else rt_axis_parts[row_idx].list_of_blocks + ), + ) + for col_idx, part in enumerate(partitions[row_idx]) + ] + for row_idx in range(len(partitions)) ] - for row_idx in range(len(left)) - ] + ) + else: + mapped_partitions = cls.map_partitions(partitions, map_func) + return cls.map_partitions( + mapped_partitions, reduce_func, axis=axis, enumerate_partitions=True ) @classmethod @wait_computations_if_benchmark_mode - def broadcast_axis_partitions( + def map_partitions( cls, - axis, + partitions, apply_func, - left, - right, + *, + axis: Optional[AxisInt] = None, + other_partitions: Optional = None, keep_partitioning=False, - apply_indices=None, + apply_indices: Optional[List[int]] = None, + other_apply_indices: Optional[List[int]] = None, + keep_remaining=False, enumerate_partitions=False, - lengths=None, + lengths: Optional[List[int]] = None, **kwargs, ): """ - Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`. + Apply ``apply_func`` to every partition in ``partitions``. + + If ``other_partitions`` is specified, then this broadcasts those partitions. Parameters ---------- - axis : {0, 1} - Axis to apply and broadcast over. + partitions : NumPy 2D array + Partitions housing the data of Modin Frame. apply_func : callable - Function to apply. - left : NumPy 2D array - Left partitions. - right : NumPy 2D array - Right partitions. + Function to apply to ``partitions``. + If ``other_partitions`` is supplied, then this is a binary function that is + applied to ``partitions`` and ``other_partitions`` together. + axis : {0, 1}, optional + Axis to map over (cell-wise if None). + other_partitions : NumPy 2D array, optional + Partitions of another partition to be broadcasted. keep_partitioning : boolean, default: False The flag to keep partition boundaries for Modin Frame. Setting it to True disables shuffling data from one partition to another. - apply_indices : list of ints, default: None - Indices of `axis ^ 1` to apply function over. + apply_indices : list of ints, optional + Indices of ``axis ^ 1`` in ``partitions`` to apply function over. + other_apply_indices : list of ints, optional + Indices of ``axis ^ 1`` in ``other_partitions`` to apply function over. + keep_remaining : bool, default: False + Whether or not to keep the other partitions. + Some operations may want to drop the remaining partitions and keep only the results. enumerate_partitions : bool, default: False - Whether or not to pass partition index into `apply_func`. - Note that `apply_func` must be able to accept `partition_idx` kwarg. + Whether or not to pass partition index into ``apply_func``. + Note that ``apply_func`` must be able to accept ``partition_idx`` kwarg. lengths : list of ints, default: None The list of lengths to shuffle the object. **kwargs : dict - Additional options that could be used by different engines. + Additional options to pass to ``map_func``. Returns ------- NumPy array - An array of partition objects. + An array of partitions """ + if axis is None: + assert other_partitions is None, "cannot broadcast for a cell-wise map" + preprocessed_map_func = cls.preprocess_func(apply_func) + return np.array( + [ + [part.apply(preprocessed_map_func) for part in row_of_parts] + for row_of_parts in partitions + ] + ) # Since we are already splitting the DataFrame back up after an # operation, we will just use this time to compute the number of # partitions as best we can right now. if keep_partitioning: - num_splits = len(left) if axis == 0 else len(left.T) + num_splits = len(partitions) if axis == 0 else len(partitions.T) elif lengths: num_splits = len(lengths) else: num_splits = NPartitions.get() preprocessed_map_func = cls.preprocess_func(apply_func) - left_partitions = cls.axis_partition(left, axis) - right_partitions = None if right is None else cls.axis_partition(right, axis) + left_partitions = cls.axis_partition(partitions, axis) + right_partitions = ( + None + if other_partitions is None + else cls.axis_partition(other_partitions, axis) + ) # For mapping across the entire axis, we don't maintain partitioning because we # may want to line to partitioning up with another BlockPartitions object. Since # we don't need to maintain the partitioning, this gives us the opportunity to # load-balance the data as well. - kw = { - "num_splits": num_splits, - "other_axis_partition": right_partitions, - } + kw = {"num_splits": num_splits, "other_axis_partition": right_partitions} if lengths: kw["lengths"] = lengths kw["manual_partition"] = True @@ -474,32 +386,6 @@ def broadcast_axis_partitions( # the structure to the correct order. return result_blocks.T if not axis else result_blocks - @classmethod - @wait_computations_if_benchmark_mode - def map_partitions(cls, partitions, map_func): - """ - Apply `map_func` to every partition in `partitions`. - - Parameters - ---------- - partitions : NumPy 2D array - Partitions housing the data of Modin Frame. - map_func : callable - Function to apply. - - Returns - ------- - NumPy array - An array of partitions - """ - preprocessed_map_func = cls.preprocess_func(map_func) - return np.array( - [ - [part.apply(preprocessed_map_func) for part in row_of_parts] - for row_of_parts in partitions - ] - ) - @classmethod @wait_computations_if_benchmark_mode def lazy_map_partitions(cls, partitions, map_func): @@ -527,169 +413,504 @@ def lazy_map_partitions(cls, partitions, map_func): ) @classmethod - def map_axis_partitions( - cls, - axis, - partitions, - map_func, - keep_partitioning=False, - lengths=None, - enumerate_partitions=False, - **kwargs, - ): + def _apply_func_to_list_of_partitions(cls, preprocessed_func, partitions, **kwargs): """ - Apply `map_func` to every partition in `partitions` along given `axis`. + Apply a function to a list of remote partitions. Parameters ---------- - axis : {0, 1} - Axis to perform the map across (0 - index, 1 - columns). - partitions : NumPy 2D array - Partitions of Modin Frame. - map_func : callable - Function to apply. - keep_partitioning : bool, default: False - Whether to keep partitioning for Modin Frame. - Setting it to True stops data shuffling between partitions. - lengths : list of ints, default: None - List of lengths to shuffle the object. - enumerate_partitions : bool, default: False - Whether or not to pass partition index into `map_func`. - Note that `map_func` must be able to accept `partition_idx` kwarg. + preprocessed_func : callable + The func to apply after being passed through `preprocess_func`. + partitions : np.ndarray + The partitions to which the `func` will apply. **kwargs : dict - Additional options that could be used by different engines. + Keyword arguments for PandasDataframePartition.apply function. Returns ------- - NumPy array - An array of new partitions for Modin Frame. - - Notes - ----- - This method should be used in the case when `map_func` relies on - some global information about the axis. + list + A list of PandasDataframePartition objects. """ - return cls.broadcast_axis_partitions( - axis=axis, - left=partitions, - apply_func=map_func, - keep_partitioning=keep_partitioning, - right=None, - lengths=lengths, - enumerate_partitions=enumerate_partitions, - **kwargs, - ) + return [obj.apply(preprocessed_func, **kwargs) for obj in partitions] @classmethod - def concat(cls, axis, left_parts, right_parts): - """ - Concatenate the blocks of partitions with another set of blocks. - - Parameters - ---------- - axis : int - The axis to concatenate to. - left_parts : np.ndarray - NumPy array of partitions to concatenate with. - right_parts : np.ndarray or list - NumPy array of partitions to be concatenated. - - Returns - ------- - np.ndarray - A new NumPy array with concatenated partitions. - list[int] or None - Row lengths if possible to compute it. - - Notes - ----- - Assumes that the blocks are already the same shape on the - dimension being concatenated. A ValueError will be thrown if this - condition is not met. + def _apply_func_to_list_of_partitions_broadcast( + cls, preprocessed_func, partitions, other, **kwargs + ): """ - # TODO: Possible change is `isinstance(right_parts, list)` - if type(right_parts) is list: - # `np.array` with partitions of empty ModinFrame has a shape (0,) - # but `np.concatenate` can concatenate arrays only if its shapes at - # specified axis are equals, so filtering empty frames to avoid concat error - right_parts = [o for o in right_parts if o.size != 0] - to_concat = ( - [left_parts] + right_parts if left_parts.size != 0 else right_parts - ) - result = ( - np.concatenate(to_concat, axis=axis) if len(to_concat) else left_parts - ) - else: - result = np.append(left_parts, right_parts, axis=axis) - if axis == 0: - return cls.rebalance_partitions(result) - else: - return result, None + Apply a function to a list of remote partitions. - @classmethod - def to_pandas(cls, partitions): - """ - Convert NumPy array of PandasDataframePartition to pandas DataFrame. + `other` partitions will be broadcasted to `partitions` and `func` will be applied. Parameters ---------- + preprocessed_func : callable + The func to apply after being passed through `preprocess_func`. partitions : np.ndarray - NumPy array of PandasDataframePartition. + The partitions to which the `func` will apply. + other : np.ndarray + The partitions to be broadcasted to `partitions`. + **kwargs : dict + Keyword arguments for PandasDataframePartition.apply function. Returns ------- - pandas.DataFrame - A pandas DataFrame + list + A list of PandasDataframePartition objects. """ - retrieved_objects = [[obj.to_pandas() for obj in part] for part in partitions] - if all( - isinstance(part, pandas.Series) for row in retrieved_objects for part in row - ): - axis = 0 - elif all( - isinstance(part, pandas.DataFrame) - for row in retrieved_objects - for part in row - ): - axis = 1 - else: - ErrorMessage.catch_bugs_and_request_email(True) - df_rows = [ - pandas.concat([part for part in row], axis=axis) - for row in retrieved_objects - if not all(part.empty for part in row) + return [ + obj.apply(preprocessed_func, other=[o.get() for o in broadcasted], **kwargs) + for obj, broadcasted in zip(partitions, other.T) ] - if len(df_rows) == 0: - return pandas.DataFrame() - else: - return concatenate(df_rows) @classmethod - def to_numpy(cls, partitions, **kwargs): + @wait_computations_if_benchmark_mode + def map_select_indices( + cls, + func, + axis, + partitions, + other_partitions, + full_axis, + apply_indices, + other_apply_indices, + keep_remaining=False, + ): """ - Convert NumPy array of PandasDataframePartition to NumPy array of data stored within `partitions`. + Apply a function to select indices. Parameters ---------- + func : callable + The function to apply to these indices of partitions. + axis : {0, 1} + Axis to apply the ``func`` over. partitions : np.ndarray - NumPy array of PandasDataframePartition. - **kwargs : dict - Keyword arguments for PandasDataframePartition.to_numpy function. + The partitions to which the ``func`` will apply. + other_partitions : np.ndarray + Partitions for another dataframe to broadcast. + full_axis : bool + Whether to apply the function to a virtual partition that encompasses the whole axis. + This argument is ignored if other_partitions is specified. + apply_indices : dict + The indices to apply the function to. + other_apply_indices : dict + The indices of ``other_partitions`` to apply the function to. + keep_remaining : bool, default: False + Whether or not to keep the other partitions. Some operations + may want to drop the remaining partitions and keep + only the results. Returns ------- np.ndarray - A NumPy array. - """ - return np.block( - [[block.to_numpy(**kwargs) for block in row] for row in partitions] - ) + A NumPy array with partitions. - @classmethod - @wait_computations_if_benchmark_mode - def from_pandas(cls, df, return_dims=False): - """ - Return the partitions from pandas.DataFrame. + Notes + ----- + Your internal function must take a kwarg `internal_indices` for + this to work correctly. This prevents information leakage of the + internal index to the external representation. + """ + if partitions.size == 0: + return np.array([[]]) + # Handling dictionaries has to be done differently, but we still want + # to figure out the partitions that need to be applied to, so we will + # store the dictionary in a separate variable and assign `apply_indices` to + # the keys to handle it the same as we normally would. + if isinstance(func, dict): + dict_func = func + else: + dict_func = None + preprocessed_func = cls.preprocess_func(func) + if full_axis: + if not keep_remaining: + selected_partitions = partitions.T if not axis else partitions + selected_partitions = np.array( + [selected_partitions[i] for i in apply_indices] + ) + selected_partitions = ( + selected_partitions.T if not axis else selected_partitions + ) + else: + selected_partitions = partitions + if not axis: + partitions_for_apply = cls.column_partitions(selected_partitions) + partitions_for_remaining = partitions.T + else: + partitions_for_apply = cls.row_partitions(selected_partitions) + partitions_for_remaining = partitions + else: + partitions_for_apply = partitions.T if not axis else partitions + + other_partitions = None if other_partitions is None else other_partitions.T + + def get_partitions(index): + """Grab required partitions and indices from `other` and `other_apply_indices`.""" + if other_partitions is None: + return {} + must_grab = other_apply_indices[index] + partitions_list = np.array([other_partitions[i] for i in must_grab.keys()]) + indices_list = list(must_grab.values()) + return {"other": partitions_list, "internal_other_indices": indices_list} + + if other_partitions is not None: + [obj.drain_call_queue() for row in other_partitions for obj in row] + new_partitions = np.array( + [ + partitions_for_apply[i] + if i not in apply_indices + else cls._apply_func_to_list_of_partitions_broadcast( + preprocessed_func, + partitions_for_apply[i], + internal_indices=apply_indices[i], + **get_partitions(i), + ) + for i in range(len(partitions_for_apply)) + if i in apply_indices or keep_remaining + ] + ) + if not axis: + new_partitions = new_partitions.T + return new_partitions + # We may have a command to perform different functions on different + # columns at the same time. We attempt to handle this as efficiently as + # possible here. Functions that use this in the dictionary format must + # accept a keyword argument `func_dict`. + if dict_func is not None: + if not keep_remaining: + if full_axis: + result = np.array( + [ + part.apply( + preprocessed_func, + func_dict={ + idx: dict_func[idx] for idx in apply_indices[i] + }, + ) + for i, part in zip(apply_indices, partitions_for_apply) + ] + ) + else: + result = np.array( + [ + cls._apply_func_to_list_of_partitions( + preprocessed_func, + partitions_for_apply[o_idx], + func_dict={ + i_idx: dict_func[i_idx] + for i_idx in list_to_apply + if i_idx >= 0 + }, + ) + for o_idx, list_to_apply in apply_indices.items() + ] + ) + else: + if full_axis: + result = np.array( + [ + partitions_for_remaining[i] + if i not in apply_indices + else cls._apply_func_to_list_of_partitions( + preprocessed_func, + partitions_for_apply[i], + func_dict={ + idx: dict_func[idx] for idx in apply_indices[i] + }, + ) + for i in range(len(partitions_for_apply)) + ] + ) + else: + result = np.array( + [ + partitions_for_apply[i] + if i not in apply_indices + else cls._apply_func_to_list_of_partitions( + preprocessed_func, + partitions_for_apply[i], + func_dict={ + idx: dict_func[idx] + for idx in apply_indices[i] + if idx >= 0 + }, + ) + for i in range(len(partitions_for_apply)) + ] + ) + else: + if not keep_remaining: + # We are passing internal indices in here. In order for func to + # actually be able to use this information, it must be able to take in + # the internal indices. This might mean an iloc in the case of Pandas + # or some other way to index into the internal representation. + if full_axis: + result = np.array( + [ + part.apply( + preprocessed_func, internal_indices=apply_indices[i] + ) + for i, part in zip(apply_indices, partitions_for_apply) + ] + ) + else: + result = np.array( + [ + cls._apply_func_to_list_of_partitions( + preprocessed_func, + partitions_for_apply[i], + internal_indices=list_to_apply, + ) + for i, list_to_apply in apply_indices.items() + ] + ) + else: + # The difference here is that we modify a subset and return the + # remaining (non-updated) blocks in their original position. + if full_axis: + result = np.array( + [ + partitions_for_remaining[i] + if i not in apply_indices + else partitions_for_apply[i].apply( + preprocessed_func, + internal_indices=apply_indices[i], + ) + for i in range(len(partitions_for_remaining)) + ] + ) + else: + result = np.array( + [ + partitions_for_apply[i] + if i not in apply_indices + else cls._apply_func_to_list_of_partitions( + preprocessed_func, + partitions_for_apply[i], + internal_indices=apply_indices[i], + ) + for i in range(len(partitions_for_apply)) + ] + ) + return result.T if not axis else result + + @classmethod + @wait_computations_if_benchmark_mode + def map_select_indices_both_axes( + cls, + partitions, + func, + row_partitions_list, + col_partitions_list, + item_to_distribute=no_default, + row_lengths=None, + col_widths=None, + ): + """ + Apply a function along both axes. + + Parameters + ---------- + partitions : np.ndarray + The partitions to which the `func` will apply. + func : callable + The function to apply. + row_partitions_list : iterable of tuples + Iterable of tuples, containing 2 values: + 1. Integer row partition index. + 2. Internal row indexer of this partition. + col_partitions_list : iterable of tuples + Iterable of tuples, containing 2 values: + 1. Integer column partition index. + 2. Internal column indexer of this partition. + item_to_distribute : np.ndarray or scalar, default: no_default + The item to split up so it can be applied over both axes. + row_lengths : list of ints, optional + Lengths of partitions for every row. If not specified this information + is extracted from partitions itself. + col_widths : list of ints, optional + Widths of partitions for every column. If not specified this information + is extracted from partitions itself. + + Returns + ------- + np.ndarray + A NumPy array with partitions. + + Notes + ----- + For your func to operate directly on the indices provided, + it must use `row_internal_indices`, `col_internal_indices` as keyword + arguments. + """ + partition_copy = partitions.copy() + row_position_counter = 0 + + if row_lengths is None: + row_lengths = [None] * len(row_partitions_list) + if col_widths is None: + col_widths = [None] * len(col_partitions_list) + + def compute_part_size(indexer, remote_part, part_idx, axis): + """Compute indexer length along the specified axis for the passed partition.""" + if isinstance(indexer, slice): + shapes_container = row_lengths if axis == 0 else col_widths + part_size = shapes_container[part_idx] + if part_size is None: + part_size = ( + remote_part.length() if axis == 0 else remote_part.width() + ) + shapes_container[part_idx] = part_size + indexer = range(*indexer.indices(part_size)) + return len(indexer) + + for row_idx, row_values in enumerate(row_partitions_list): + row_blk_idx, row_internal_idx = row_values + col_position_counter = 0 + for col_idx, col_values in enumerate(col_partitions_list): + col_blk_idx, col_internal_idx = col_values + remote_part = partition_copy[row_blk_idx, col_blk_idx] + + row_offset = compute_part_size( + row_internal_idx, remote_part, row_idx, axis=0 + ) + col_offset = compute_part_size( + col_internal_idx, remote_part, col_idx, axis=1 + ) + + if item_to_distribute is not no_default: + if isinstance(item_to_distribute, np.ndarray): + item = item_to_distribute[ + row_position_counter : row_position_counter + row_offset, + col_position_counter : col_position_counter + col_offset, + ] + else: + item = item_to_distribute + item = {"item": item} + else: + item = {} + block_result = remote_part.add_to_apply_calls( + func, + row_internal_indices=row_internal_idx, + col_internal_indices=col_internal_idx, + **item, + ) + partition_copy[row_blk_idx, col_blk_idx] = block_result + col_position_counter += col_offset + row_position_counter += row_offset + return partition_copy + + @classmethod + def concat(cls, axis, left_parts, right_parts): + """ + Concatenate the blocks of partitions with another set of blocks. + + Parameters + ---------- + axis : int + The axis to concatenate to. + left_parts : np.ndarray + NumPy array of partitions to concatenate with. + right_parts : np.ndarray or list + NumPy array of partitions to be concatenated. + + Returns + ------- + np.ndarray + A new NumPy array with concatenated partitions. + list[int] or None + Row lengths if possible to compute it. + + Notes + ----- + Assumes that the blocks are already the same shape on the + dimension being concatenated. A ValueError will be thrown if this + condition is not met. + """ + # TODO: Possible change is `isinstance(right_parts, list)` + if type(right_parts) is list: + # `np.array` with partitions of empty ModinFrame has a shape (0,) + # but `np.concatenate` can concatenate arrays only if its shapes at + # specified axis are equals, so filtering empty frames to avoid concat error + right_parts = [o for o in right_parts if o.size != 0] + to_concat = ( + [left_parts] + right_parts if left_parts.size != 0 else right_parts + ) + result = ( + np.concatenate(to_concat, axis=axis) if len(to_concat) else left_parts + ) + else: + result = np.append(left_parts, right_parts, axis=axis) + if axis == 0: + return cls.rebalance_partitions(result) + else: + return result, None + + @classmethod + def to_pandas(cls, partitions): + """ + Convert NumPy array of PandasDataframePartition to pandas DataFrame. + + Parameters + ---------- + partitions : np.ndarray + NumPy array of PandasDataframePartition. + + Returns + ------- + pandas.DataFrame + A pandas DataFrame + """ + retrieved_objects = [[obj.to_pandas() for obj in part] for part in partitions] + if all( + isinstance(part, pandas.Series) for row in retrieved_objects for part in row + ): + axis = 0 + elif all( + isinstance(part, pandas.DataFrame) + for row in retrieved_objects + for part in row + ): + axis = 1 + else: + ErrorMessage.catch_bugs_and_request_email(True) + df_rows = [ + pandas.concat([part for part in row], axis=axis) + for row in retrieved_objects + if not all(part.empty for part in row) + ] + if len(df_rows) == 0: + return pandas.DataFrame() + else: + return concatenate(df_rows) + + @classmethod + def to_numpy(cls, partitions, **kwargs): + """ + Convert NumPy array of PandasDataframePartition to NumPy array of data stored within `partitions`. + + Parameters + ---------- + partitions : np.ndarray + NumPy array of PandasDataframePartition. + **kwargs : dict + Keyword arguments for PandasDataframePartition.to_numpy function. + + Returns + ------- + np.ndarray + A NumPy array. + """ + return np.block( + [[block.to_numpy(**kwargs) for block in row] for row in partitions] + ) + + @classmethod + @wait_computations_if_benchmark_mode + def from_pandas(cls, df, return_dims=False): + """ + Return the partitions from pandas.DataFrame. Parameters ---------- @@ -782,484 +1003,90 @@ def from_arrow(cls, at, return_dims=False): Returns ------- - np.ndarray or (np.ndarray, row_lengths, col_widths) - A NumPy array with partitions (with dimensions or not). - """ - return cls.from_pandas(at.to_pandas(), return_dims=return_dims) - - @classmethod - def get_objects_from_partitions(cls, partitions): - """ - Get the objects wrapped by `partitions`. - - Parameters - ---------- - partitions : np.ndarray - NumPy array with ``PandasDataframePartition``-s. - - Returns - ------- - list - The objects wrapped by `partitions`. - - Notes - ----- - This method should be implemented in a more efficient way for engines that support - getting objects in parallel. - """ - return [partition.get() for partition in partitions] - - @classmethod - def wait_partitions(cls, partitions): - """ - Wait on the objects wrapped by `partitions`, without materializing them. - - This method will block until all computations in the list have completed. - - Parameters - ---------- - partitions : np.ndarray - NumPy array with ``PandasDataframePartition``-s. - - Notes - ----- - This method should be implemented in a more efficient way for engines that supports - waiting on objects in parallel. - """ - for partition in partitions: - partition.wait() - - @classmethod - def get_indices(cls, axis, partitions, index_func=None): - """ - Get the internal indices stored in the partitions. - - Parameters - ---------- - axis : {0, 1} - Axis to extract the labels over. - partitions : np.ndarray - NumPy array with PandasDataframePartition's. - index_func : callable, default: None - The function to be used to extract the indices. - - Returns - ------- - pandas.Index - A pandas Index object. - list of pandas.Index - The list of internal indices for each partition. - - Notes - ----- - These are the global indices of the object. This is mostly useful - when you have deleted rows/columns internally, but do not know - which ones were deleted. - """ - if index_func is None: - index_func = lambda df: df.axes[axis] # noqa: E731 - ErrorMessage.catch_bugs_and_request_email(not callable(index_func)) - func = cls.preprocess_func(index_func) - target = partitions.T if axis == 0 else partitions - new_idx = [idx.apply(func) for idx in target[0]] if len(target) else [] - new_idx = cls.get_objects_from_partitions(new_idx) - # TODO FIX INFORMATION LEAK!!!!1!!1!! - total_idx = new_idx[0].append(new_idx[1:]) if new_idx else new_idx - return total_idx, new_idx - - @classmethod - def _apply_func_to_list_of_partitions_broadcast( - cls, func, partitions, other, **kwargs - ): - """ - Apply a function to a list of remote partitions. - - `other` partitions will be broadcasted to `partitions` - and `func` will be applied. - - Parameters - ---------- - func : callable - The func to apply. - partitions : np.ndarray - The partitions to which the `func` will apply. - other : np.ndarray - The partitions to be broadcasted to `partitions`. - **kwargs : dict - Keyword arguments for PandasDataframePartition.apply function. - - Returns - ------- - list - A list of PandasDataframePartition objects. - """ - preprocessed_func = cls.preprocess_func(func) - return [ - obj.apply(preprocessed_func, other=[o.get() for o in broadcasted], **kwargs) - for obj, broadcasted in zip(partitions, other.T) - ] - - @classmethod - def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs): - """ - Apply a function to a list of remote partitions. - - Parameters - ---------- - func : callable - The func to apply. - partitions : np.ndarray - The partitions to which the `func` will apply. - **kwargs : dict - Keyword arguments for PandasDataframePartition.apply function. - - Returns - ------- - list - A list of PandasDataframePartition objects. - - Notes - ----- - This preprocesses the `func` first before applying it to the partitions. + np.ndarray or (np.ndarray, row_lengths, col_widths) + A NumPy array with partitions (with dimensions or not). """ - preprocessed_func = cls.preprocess_func(func) - return [obj.apply(preprocessed_func, **kwargs) for obj in partitions] + return cls.from_pandas(at.to_pandas(), return_dims=return_dims) @classmethod - @wait_computations_if_benchmark_mode - def apply_func_to_select_indices( - cls, axis, partitions, func, indices, keep_remaining=False - ): + def get_objects_from_partitions(cls, partitions): """ - Apply a function to select indices. + Get the objects wrapped by `partitions`. Parameters ---------- - axis : {0, 1} - Axis to apply the `func` over. partitions : np.ndarray - The partitions to which the `func` will apply. - func : callable - The function to apply to these indices of partitions. - indices : dict - The indices to apply the function to. - keep_remaining : bool, default: False - Whether or not to keep the other partitions. Some operations - may want to drop the remaining partitions and keep - only the results. + NumPy array with ``PandasDataframePartition``-s. Returns ------- - np.ndarray - A NumPy array with partitions. + list + The objects wrapped by `partitions`. Notes ----- - Your internal function must take a kwarg `internal_indices` for - this to work correctly. This prevents information leakage of the - internal index to the external representation. + This method should be implemented in a more efficient way for engines that support + getting objects in parallel. """ - if partitions.size == 0: - return np.array([[]]) - # Handling dictionaries has to be done differently, but we still want - # to figure out the partitions that need to be applied to, so we will - # store the dictionary in a separate variable and assign `indices` to - # the keys to handle it the same as we normally would. - if isinstance(func, dict): - dict_func = func - else: - dict_func = None - if not axis: - partitions_for_apply = partitions.T - else: - partitions_for_apply = partitions - # We may have a command to perform different functions on different - # columns at the same time. We attempt to handle this as efficiently as - # possible here. Functions that use this in the dictionary format must - # accept a keyword argument `func_dict`. - if dict_func is not None: - if not keep_remaining: - result = np.array( - [ - cls._apply_func_to_list_of_partitions( - func, - partitions_for_apply[o_idx], - func_dict={ - i_idx: dict_func[i_idx] - for i_idx in list_to_apply - if i_idx >= 0 - }, - ) - for o_idx, list_to_apply in indices.items() - ] - ) - else: - result = np.array( - [ - partitions_for_apply[i] - if i not in indices - else cls._apply_func_to_list_of_partitions( - func, - partitions_for_apply[i], - func_dict={ - idx: dict_func[idx] for idx in indices[i] if idx >= 0 - }, - ) - for i in range(len(partitions_for_apply)) - ] - ) - else: - if not keep_remaining: - # We are passing internal indices in here. In order for func to - # actually be able to use this information, it must be able to take in - # the internal indices. This might mean an iloc in the case of Pandas - # or some other way to index into the internal representation. - result = np.array( - [ - cls._apply_func_to_list_of_partitions( - func, - partitions_for_apply[idx], - internal_indices=list_to_apply, - ) - for idx, list_to_apply in indices.items() - ] - ) - else: - # The difference here is that we modify a subset and return the - # remaining (non-updated) blocks in their original position. - result = np.array( - [ - partitions_for_apply[i] - if i not in indices - else cls._apply_func_to_list_of_partitions( - func, partitions_for_apply[i], internal_indices=indices[i] - ) - for i in range(len(partitions_for_apply)) - ] - ) - return result.T if not axis else result + return [partition.get() for partition in partitions] @classmethod - @wait_computations_if_benchmark_mode - def apply_func_to_select_indices_along_full_axis( - cls, axis, partitions, func, indices, keep_remaining=False - ): + def wait_partitions(cls, partitions): """ - Apply a function to a select subset of full columns/rows. + Wait on the objects wrapped by `partitions`, without materializing them. + + This method will block until all computations in the list have completed. Parameters ---------- - axis : {0, 1} - The axis to apply the function over. partitions : np.ndarray - The partitions to which the `func` will apply. - func : callable - The function to apply. - indices : list-like - The global indices to apply the func to. - keep_remaining : bool, default: False - Whether or not to keep the other partitions. - Some operations may want to drop the remaining partitions and - keep only the results. - - Returns - ------- - np.ndarray - A NumPy array with partitions. + NumPy array with ``PandasDataframePartition``-s. Notes ----- - This should be used when you need to apply a function that relies - on some global information for the entire column/row, but only need - to apply a function to a subset. - For your func to operate directly on the indices provided, - it must use `internal_indices` as a keyword argument. + This method should be implemented in a more efficient way for engines that supports + waiting on objects in parallel. """ - if partitions.size == 0: - return np.array([[]]) - # Handling dictionaries has to be done differently, but we still want - # to figure out the partitions that need to be applied to, so we will - # store the dictionary in a separate variable and assign `indices` to - # the keys to handle it the same as we normally would. - if isinstance(func, dict): - dict_func = func - else: - dict_func = None - preprocessed_func = cls.preprocess_func(func) - # Since we might be keeping the remaining blocks that are not modified, - # we have to also keep the block_partitions object in the correct - # direction (transpose for columns). - if not keep_remaining: - selected_partitions = partitions.T if not axis else partitions - selected_partitions = np.array([selected_partitions[i] for i in indices]) - selected_partitions = ( - selected_partitions.T if not axis else selected_partitions - ) - else: - selected_partitions = partitions - if not axis: - partitions_for_apply = cls.column_partitions(selected_partitions) - partitions_for_remaining = partitions.T - else: - partitions_for_apply = cls.row_partitions(selected_partitions) - partitions_for_remaining = partitions - # We may have a command to perform different functions on different - # columns at the same time. We attempt to handle this as efficiently as - # possible here. Functions that use this in the dictionary format must - # accept a keyword argument `func_dict`. - if dict_func is not None: - if not keep_remaining: - result = np.array( - [ - part.apply( - preprocessed_func, - func_dict={idx: dict_func[idx] for idx in indices[i]}, - ) - for i, part in zip(indices, partitions_for_apply) - ] - ) - else: - result = np.array( - [ - partitions_for_remaining[i] - if i not in indices - else cls._apply_func_to_list_of_partitions( - preprocessed_func, - partitions_for_apply[i], - func_dict={idx: dict_func[idx] for idx in indices[i]}, - ) - for i in range(len(partitions_for_apply)) - ] - ) - else: - if not keep_remaining: - # See notes in `apply_func_to_select_indices` - result = np.array( - [ - part.apply(preprocessed_func, internal_indices=indices[i]) - for i, part in zip(indices, partitions_for_apply) - ] - ) - else: - # See notes in `apply_func_to_select_indices` - result = np.array( - [ - partitions_for_remaining[i] - if i not in indices - else partitions_for_apply[i].apply( - preprocessed_func, internal_indices=indices[i] - ) - for i in range(len(partitions_for_remaining)) - ] - ) - return result.T if not axis else result + for partition in partitions: + partition.wait() @classmethod - @wait_computations_if_benchmark_mode - def apply_func_to_indices_both_axis( - cls, - partitions, - func, - row_partitions_list, - col_partitions_list, - item_to_distribute=no_default, - row_lengths=None, - col_widths=None, - ): + def get_indices(cls, axis, partitions, index_func=None): """ - Apply a function along both axes. + Get the internal indices stored in the partitions. Parameters ---------- + axis : {0, 1} + Axis to extract the labels over. partitions : np.ndarray - The partitions to which the `func` will apply. - func : callable - The function to apply. - row_partitions_list : iterable of tuples - Iterable of tuples, containing 2 values: - 1. Integer row partition index. - 2. Internal row indexer of this partition. - col_partitions_list : iterable of tuples - Iterable of tuples, containing 2 values: - 1. Integer column partition index. - 2. Internal column indexer of this partition. - item_to_distribute : np.ndarray or scalar, default: no_default - The item to split up so it can be applied over both axes. - row_lengths : list of ints, optional - Lengths of partitions for every row. If not specified this information - is extracted from partitions itself. - col_widths : list of ints, optional - Widths of partitions for every column. If not specified this information - is extracted from partitions itself. + NumPy array with PandasDataframePartition's. + index_func : callable, default: None + The function to be used to extract the indices. Returns ------- - np.ndarray - A NumPy array with partitions. + pandas.Index + A pandas Index object. + list of pandas.Index + The list of internal indices for each partition. Notes ----- - For your func to operate directly on the indices provided, - it must use `row_internal_indices`, `col_internal_indices` as keyword - arguments. + These are the global indices of the object. This is mostly useful + when you have deleted rows/columns internally, but do not know + which ones were deleted. """ - partition_copy = partitions.copy() - row_position_counter = 0 - - if row_lengths is None: - row_lengths = [None] * len(row_partitions_list) - if col_widths is None: - col_widths = [None] * len(col_partitions_list) - - def compute_part_size(indexer, remote_part, part_idx, axis): - """Compute indexer length along the specified axis for the passed partition.""" - if isinstance(indexer, slice): - shapes_container = row_lengths if axis == 0 else col_widths - part_size = shapes_container[part_idx] - if part_size is None: - part_size = ( - remote_part.length() if axis == 0 else remote_part.width() - ) - shapes_container[part_idx] = part_size - indexer = range(*indexer.indices(part_size)) - return len(indexer) - - for row_idx, row_values in enumerate(row_partitions_list): - row_blk_idx, row_internal_idx = row_values - col_position_counter = 0 - row_offset = 0 - for col_idx, col_values in enumerate(col_partitions_list): - col_blk_idx, col_internal_idx = col_values - remote_part = partition_copy[row_blk_idx, col_blk_idx] - - row_offset = compute_part_size( - row_internal_idx, remote_part, row_idx, axis=0 - ) - col_offset = compute_part_size( - col_internal_idx, remote_part, col_idx, axis=1 - ) - - if item_to_distribute is not no_default: - if isinstance(item_to_distribute, np.ndarray): - item = item_to_distribute[ - row_position_counter : row_position_counter + row_offset, - col_position_counter : col_position_counter + col_offset, - ] - else: - item = item_to_distribute - item = {"item": item} - else: - item = {} - block_result = remote_part.add_to_apply_calls( - func, - row_internal_indices=row_internal_idx, - col_internal_indices=col_internal_idx, - **item, - ) - partition_copy[row_blk_idx, col_blk_idx] = block_result - col_position_counter += col_offset - row_position_counter += row_offset - return partition_copy + if index_func is None: + index_func = lambda df: df.axes[axis] # noqa: E731 + ErrorMessage.catch_bugs_and_request_email(not callable(index_func)) + func = cls.preprocess_func(index_func) + target = partitions.T if axis == 0 else partitions + new_idx = [idx.apply(func) for idx in target[0]] if len(target) else [] + new_idx = cls.get_objects_from_partitions(new_idx) + # TODO FIX INFORMATION LEAK!!!!1!!1!! + total_idx = new_idx[0].append(new_idx[1:]) if new_idx else new_idx + return total_idx, new_idx @classmethod @wait_computations_if_benchmark_mode diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 6922ef0406e..00d2220ba66 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -103,7 +103,7 @@ def func(df): Notes ----- - This function returns an empty ``pandas.DataFrame`` because ``apply_full_axis`` + This function returns an empty ``pandas.DataFrame`` because ``map`` expects a Frame object as a result of operation (and ``to_sql`` has no dataframe result). """ df.columns = columns @@ -112,8 +112,10 @@ def func(df): # Ensure that the metadata is syncrhonized qc._modin_frame._propagate_index_objs(axis=None) - result = qc._modin_frame.apply_full_axis(1, func, new_index=[], new_columns=[]) - # FIXME: we should be waiting for completion less expensievely, maybe use _modin_frame.materialize()? + result = qc._modin_frame.map( + func, axis=1, full_axis=True, new_index=[], new_columns=[] + ) + # FIXME: we should be waiting for completion less expensively, maybe use _modin_frame.materialize()? result.to_pandas() # blocking operation @staticmethod @@ -228,10 +230,10 @@ def func(df, **kw): RayWrapper.materialize(signals.send.remote(0)) # Ensure that the metadata is syncrhonized qc._modin_frame._propagate_index_objs(axis=None) - result = qc._modin_frame._partition_mgr_cls.map_axis_partitions( + result = qc._modin_frame._partition_mgr_cls.map_partitions( + qc._modin_frame._partitions, + func, axis=1, - partitions=qc._modin_frame._partitions, - map_func=func, keep_partitioning=True, lengths=None, enumerate_partitions=True, @@ -307,10 +309,10 @@ def func(df, **kw): # Ensure that the metadata is synchronized qc._modin_frame._propagate_index_objs(axis=None) - result = qc._modin_frame._partition_mgr_cls.map_axis_partitions( + result = qc._modin_frame._partition_mgr_cls.map_partitions( + qc._modin_frame._partitions, + func, axis=1, - partitions=qc._modin_frame._partitions, - map_func=func, keep_partitioning=True, lengths=None, enumerate_partitions=True, diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 91a720bcb3e..12805147e47 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -161,11 +161,9 @@ def _make_wrapped_method(name: str): for method in ( "map_partitions", "lazy_map_partitions", - "map_axis_partitions", + "map_select_indices", + "map_select_indices_both_axes", "_apply_func_to_list_of_partitions", - "apply_func_to_select_indices", - "apply_func_to_select_indices_along_full_axis", - "apply_func_to_indices_both_axis", "n_ary_operation", ): _make_wrapped_method(method) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 254e4de9145..c23db051cfd 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -476,7 +476,7 @@ def map_func(left, right=right, kwargs=kwargs): return pandas.merge(left, right, **kwargs) new_self = self.__constructor__( - self._modin_frame.apply_full_axis(1, map_func) + self._modin_frame.map(map_func, axis=1, full_axis=True) ) is_reset_index = True if left_on and right_on: @@ -521,7 +521,7 @@ def map_func(left, right=right, kwargs=kwargs): return pandas_dataframe_join(left, right, **kwargs) new_self = self.__constructor__( - self._modin_frame.apply_full_axis(1, map_func) + self._modin_frame.map(map_func, axis=1, full_axis=True) ) return new_self.sort_rows_by_column_values(on) if sort else new_self else: @@ -533,9 +533,10 @@ def map_func(left, right=right, kwargs=kwargs): def reindex(self, axis, labels, **kwargs): new_index = self.index if axis else labels new_columns = labels if axis else self.columns - new_modin_frame = self._modin_frame.apply_full_axis( - axis, + new_modin_frame = self._modin_frame.map( lambda df: df.reindex(labels=labels, axis=axis, **kwargs), + axis=axis, + full_axis=True, new_index=new_index, new_columns=new_columns, ) @@ -890,8 +891,8 @@ def map_func(df, resample_kwargs=resample_kwargs): else: return val - new_modin_frame = self._modin_frame.apply_full_axis( - axis=0, func=map_func, new_columns=new_columns + new_modin_frame = self._modin_frame.map( + func=map_func, axis=0, full_axis=True, new_columns=new_columns ) return self.__constructor__(new_modin_frame) @@ -1183,13 +1184,14 @@ def rolling_cov(self, axis, rolling_args, other, pairwise, ddof, **kwargs): )(self, axis) def rolling_aggregate(self, axis, rolling_args, func, *args, **kwargs): - new_modin_frame = self._modin_frame.apply_full_axis( - axis, + new_modin_frame = self._modin_frame.map( lambda df: pandas.DataFrame( df.rolling(*rolling_args[0], **rolling_args[1]).aggregate( func=func, *args, **kwargs ) ), + axis=axis, + full_axis=True, new_index=self.index, ) return self.__constructor__(new_modin_frame) @@ -1254,8 +1256,8 @@ def is_tree_like_or_1d(calc_index, valid_index): else: obj = self - new_modin_frame = obj._modin_frame.apply_full_axis( - axis, map_func, new_columns=new_columns + new_modin_frame = obj._modin_frame.map( + map_func, axis=axis, full_axis=True, new_columns=new_columns ) result = self.__constructor__(new_modin_frame) @@ -1358,29 +1360,33 @@ def stack(self, level, dropna): else: new_columns = None - new_modin_frame = self._modin_frame.apply_full_axis( - 1, + new_modin_frame = self._modin_frame.map( lambda df: pandas.DataFrame(df.stack(level=level, dropna=dropna)), + axis=1, + full_axis=True, new_columns=new_columns, ) return self.__constructor__(new_modin_frame) # Map partitions operations # These operations are operations that apply a function to every partition. - abs = Map.register(pandas.DataFrame.abs, dtypes="copy") + # Though all these operations are element-wise, some can be sped up by mapping across a row/col + abs = Map.register(pandas.DataFrame.abs, copy_dtypes=True, axis=0) applymap = Map.register(pandas.DataFrame.applymap) - conj = Map.register(lambda df, *args, **kwargs: pandas.DataFrame(np.conj(df))) + conj = Map.register( + lambda df, *args, **kwargs: pandas.DataFrame(np.conj(df)), axis=0 + ) convert_dtypes = Map.register(pandas_convert_dtypes) - invert = Map.register(pandas.DataFrame.__invert__) - isin = Map.register(pandas.DataFrame.isin, dtypes=np.bool_) - isna = Map.register(pandas.DataFrame.isna, dtypes=np.bool_) + invert = Map.register(pandas.DataFrame.__invert__, copy_dtypes=True, axis=0) + isin = Map.register(pandas.DataFrame.isin, dtypes=np.bool_, axis=0) + isna = Map.register(pandas.DataFrame.isna, dtypes=np.bool_, axis=0) _isfinite = Map.register( - lambda df, *args, **kwargs: pandas.DataFrame(np.isfinite(df)) + lambda df, *args, **kwargs: pandas.DataFrame(np.isfinite(df)), axis=0 ) - negative = Map.register(pandas.DataFrame.__neg__) - notna = Map.register(pandas.DataFrame.notna, dtypes=np.bool_) - round = Map.register(pandas.DataFrame.round) - replace = Map.register(pandas.DataFrame.replace) + negative = Map.register(pandas.DataFrame.__neg__, copy_dtypes=True, axis=0) + notna = Map.register(pandas.DataFrame.notna, dtypes=np.bool_, axis=0) + round = Map.register(pandas.DataFrame.round, copy_dtypes=True, axis=0) + replace = Map.register(pandas.DataFrame.replace, axis=0) series_view = Map.register( lambda df, *args, **kwargs: pandas.DataFrame( df.squeeze(axis=1).view(*args, **kwargs) @@ -1395,67 +1401,70 @@ def stack(self, level, dropna): lambda s, *args, **kwargs: pandas.to_timedelta( s.squeeze(axis=1), *args, **kwargs ).to_frame(), - dtypes="timedelta64[ns]", + dtypes=np.dtype("timedelta64[ns]"), ) # END Map partitions operations # String map partitions operations - str_capitalize = Map.register(_str_map("capitalize"), dtypes="copy") - str_center = Map.register(_str_map("center"), dtypes="copy") - str_contains = Map.register(_str_map("contains"), dtypes=np.bool_) - str_count = Map.register(_str_map("count"), dtypes=int) - str_endswith = Map.register(_str_map("endswith"), dtypes=np.bool_) - str_find = Map.register(_str_map("find"), dtypes="copy") - str_findall = Map.register(_str_map("findall"), dtypes="copy") - str_get = Map.register(_str_map("get"), dtypes="copy") - str_index = Map.register(_str_map("index"), dtypes="copy") - str_isalnum = Map.register(_str_map("isalnum"), dtypes=np.bool_) - str_isalpha = Map.register(_str_map("isalpha"), dtypes=np.bool_) - str_isdecimal = Map.register(_str_map("isdecimal"), dtypes=np.bool_) - str_isdigit = Map.register(_str_map("isdigit"), dtypes=np.bool_) - str_islower = Map.register(_str_map("islower"), dtypes=np.bool_) - str_isnumeric = Map.register(_str_map("isnumeric"), dtypes=np.bool_) - str_isspace = Map.register(_str_map("isspace"), dtypes=np.bool_) - str_istitle = Map.register(_str_map("istitle"), dtypes=np.bool_) - str_isupper = Map.register(_str_map("isupper"), dtypes=np.bool_) - str_join = Map.register(_str_map("join"), dtypes="copy") - str_len = Map.register(_str_map("len"), dtypes=int) - str_ljust = Map.register(_str_map("ljust"), dtypes="copy") - str_lower = Map.register(_str_map("lower"), dtypes="copy") - str_lstrip = Map.register(_str_map("lstrip"), dtypes="copy") - str_match = Map.register(_str_map("match"), dtypes="copy") - str_normalize = Map.register(_str_map("normalize"), dtypes="copy") - str_pad = Map.register(_str_map("pad"), dtypes="copy") - str_partition = Map.register(_str_map("partition"), dtypes="copy") - str_repeat = Map.register(_str_map("repeat"), dtypes="copy") - str_replace = Map.register(_str_map("replace"), dtypes="copy") - str_rfind = Map.register(_str_map("rfind"), dtypes="copy") - str_rindex = Map.register(_str_map("rindex"), dtypes="copy") - str_rjust = Map.register(_str_map("rjust"), dtypes="copy") - str_rpartition = Map.register(_str_map("rpartition"), dtypes="copy") - str_rsplit = Map.register(_str_map("rsplit"), dtypes="copy") - str_rstrip = Map.register(_str_map("rstrip"), dtypes="copy") - str_slice = Map.register(_str_map("slice"), dtypes="copy") - str_slice_replace = Map.register(_str_map("slice_replace"), dtypes="copy") - str_split = Map.register(_str_map("split"), dtypes="copy") - str_startswith = Map.register(_str_map("startswith"), dtypes=np.bool_) - str_strip = Map.register(_str_map("strip"), dtypes="copy") - str_swapcase = Map.register(_str_map("swapcase"), dtypes="copy") - str_title = Map.register(_str_map("title"), dtypes="copy") - str_translate = Map.register(_str_map("translate"), dtypes="copy") - str_upper = Map.register(_str_map("upper"), dtypes="copy") - str_wrap = Map.register(_str_map("wrap"), dtypes="copy") - str_zfill = Map.register(_str_map("zfill"), dtypes="copy") - str___getitem__ = Map.register(_str_map("__getitem__"), dtypes="copy") + str_capitalize = Map.register(_str_map("capitalize"), copy_dtypes=True, axis=0) + str_center = Map.register(_str_map("center"), copy_dtypes=True, axis=0) + str_contains = Map.register(_str_map("contains"), dtypes=np.bool_, axis=0) + str_count = Map.register(_str_map("count"), dtypes=int, axis=0) + str_endswith = Map.register(_str_map("endswith"), dtypes=np.bool_, axis=0) + str_find = Map.register(_str_map("find"), copy_dtypes=True, axis=0) + str_findall = Map.register(_str_map("findall"), copy_dtypes=True, axis=0) + str_get = Map.register(_str_map("get"), copy_dtypes=True, axis=0) + str_index = Map.register(_str_map("index"), copy_dtypes=True, axis=0) + str_isalnum = Map.register(_str_map("isalnum"), dtypes=np.bool_, axis=0) + str_isalpha = Map.register(_str_map("isalpha"), dtypes=np.bool_, axis=0) + str_isdecimal = Map.register(_str_map("isdecimal"), dtypes=np.bool_, axis=0) + str_isdigit = Map.register(_str_map("isdigit"), dtypes=np.bool_, axis=0) + str_islower = Map.register(_str_map("islower"), dtypes=np.bool_, axis=0) + str_isnumeric = Map.register(_str_map("isnumeric"), dtypes=np.bool_, axis=0) + str_isspace = Map.register(_str_map("isspace"), dtypes=np.bool_, axis=0) + str_istitle = Map.register(_str_map("istitle"), dtypes=np.bool_, axis=0) + str_isupper = Map.register(_str_map("isupper"), dtypes=np.bool_, axis=0) + str_join = Map.register(_str_map("join"), copy_dtypes=True, axis=0) + str_len = Map.register(_str_map("len"), dtypes=int, axis=0) + str_ljust = Map.register(_str_map("ljust"), copy_dtypes=True, axis=0) + str_lower = Map.register(_str_map("lower"), copy_dtypes=True, axis=0) + str_lstrip = Map.register(_str_map("lstrip"), copy_dtypes=True, axis=0) + str_match = Map.register(_str_map("match"), copy_dtypes=True, axis=0) + str_normalize = Map.register(_str_map("normalize"), copy_dtypes=True, axis=0) + str_pad = Map.register(_str_map("pad"), copy_dtypes=True, axis=0) + str_partition = Map.register(_str_map("partition"), copy_dtypes=True, axis=0) + str_repeat = Map.register(_str_map("repeat"), copy_dtypes=True, axis=0) + str_replace = Map.register(_str_map("replace"), copy_dtypes=True, axis=0) + str_rfind = Map.register(_str_map("rfind"), copy_dtypes=True, axis=0) + str_rindex = Map.register(_str_map("rindex"), copy_dtypes=True, axis=0) + str_rjust = Map.register(_str_map("rjust"), copy_dtypes=True, axis=0) + str_rpartition = Map.register(_str_map("rpartition"), copy_dtypes=True, axis=0) + str_rsplit = Map.register(_str_map("rsplit"), copy_dtypes=True, axis=0) + str_rstrip = Map.register(_str_map("rstrip"), copy_dtypes=True, axis=0) + str_slice = Map.register(_str_map("slice"), copy_dtypes=True, axis=0) + str_slice_replace = Map.register( + _str_map("slice_replace"), copy_dtypes=True, axis=0 + ) + str_split = Map.register(_str_map("split"), copy_dtypes=True, axis=0) + str_startswith = Map.register(_str_map("startswith"), dtypes=np.bool_, axis=0) + str_strip = Map.register(_str_map("strip"), copy_dtypes=True, axis=0) + str_swapcase = Map.register(_str_map("swapcase"), copy_dtypes=True, axis=0) + str_title = Map.register(_str_map("title"), copy_dtypes=True, axis=0) + str_translate = Map.register(_str_map("translate"), copy_dtypes=True, axis=0) + str_upper = Map.register(_str_map("upper"), copy_dtypes=True, axis=0) + str_wrap = Map.register(_str_map("wrap"), copy_dtypes=True, axis=0) + str_zfill = Map.register(_str_map("zfill"), copy_dtypes=True, axis=0) + str___getitem__ = Map.register(_str_map("__getitem__"), copy_dtypes=True) # END String map partitions operations def unique(self): - new_modin_frame = self._modin_frame.apply_full_axis( - 0, + new_modin_frame = self._modin_frame.map( lambda x: x.squeeze(axis=1).unique(), + axis=0, + full_axis=True, new_columns=self.columns, ) return self.__constructor__(new_modin_frame) @@ -1472,34 +1481,46 @@ def searchsorted(df): # Dt map partitions operations - dt_date = Map.register(_dt_prop_map("date"), dtypes=np.object_) - dt_time = Map.register(_dt_prop_map("time"), dtypes=np.object_) - dt_timetz = Map.register(_dt_prop_map("timetz"), dtypes=np.object_) - dt_year = Map.register(_dt_prop_map("year"), dtypes=np.int64) - dt_month = Map.register(_dt_prop_map("month"), dtypes=np.int64) - dt_day = Map.register(_dt_prop_map("day"), dtypes=np.int64) - dt_hour = Map.register(_dt_prop_map("hour"), dtypes=np.int64) - dt_minute = Map.register(_dt_prop_map("minute"), dtypes=np.int64) - dt_second = Map.register(_dt_prop_map("second"), dtypes=np.int64) - dt_microsecond = Map.register(_dt_prop_map("microsecond"), dtypes=np.int64) - dt_nanosecond = Map.register(_dt_prop_map("nanosecond"), dtypes=np.int64) - dt_week = Map.register(_dt_prop_map("week"), dtypes=np.int64) - dt_weekofyear = Map.register(_dt_prop_map("weekofyear"), dtypes=np.int64) - dt_dayofweek = Map.register(_dt_prop_map("dayofweek"), dtypes=np.int64) - dt_weekday = Map.register(_dt_prop_map("weekday"), dtypes=np.int64) - dt_dayofyear = Map.register(_dt_prop_map("dayofyear"), dtypes=np.int64) - dt_quarter = Map.register(_dt_prop_map("quarter"), dtypes=np.int64) - dt_is_month_start = Map.register(_dt_prop_map("is_month_start"), dtypes=np.bool_) - dt_is_month_end = Map.register(_dt_prop_map("is_month_end"), dtypes=np.bool_) + dt_date = Map.register(_dt_prop_map("date"), dtypes=np.object_, axis=0) + dt_time = Map.register(_dt_prop_map("time"), dtypes=np.object_, axis=0) + dt_timetz = Map.register(_dt_prop_map("timetz"), dtypes=np.object_, axis=0) + dt_year = Map.register(_dt_prop_map("year"), dtypes=np.int64, axis=0) + dt_month = Map.register(_dt_prop_map("month"), dtypes=np.int64, axis=0) + dt_day = Map.register(_dt_prop_map("day"), dtypes=np.int64, axis=0) + dt_hour = Map.register(_dt_prop_map("hour"), dtypes=np.int64, axis=0) + dt_minute = Map.register(_dt_prop_map("minute"), dtypes=np.int64, axis=0) + dt_second = Map.register(_dt_prop_map("second"), dtypes=np.int64, axis=0) + dt_microsecond = Map.register(_dt_prop_map("microsecond"), dtypes=np.int64, axis=0) + dt_nanosecond = Map.register(_dt_prop_map("nanosecond"), dtypes=np.int64, axis=0) + dt_week = Map.register(_dt_prop_map("week"), dtypes=np.int64, axis=0) + dt_weekofyear = Map.register(_dt_prop_map("weekofyear"), dtypes=np.int64, axis=0) + dt_dayofweek = Map.register(_dt_prop_map("dayofweek"), dtypes=np.int64, axis=0) + dt_weekday = Map.register(_dt_prop_map("weekday"), dtypes=np.int64, axis=0) + dt_dayofyear = Map.register(_dt_prop_map("dayofyear"), dtypes=np.int64, axis=0) + dt_quarter = Map.register(_dt_prop_map("quarter"), dtypes=np.int64, axis=0) + dt_is_month_start = Map.register( + _dt_prop_map("is_month_start"), dtypes=np.bool_, axis=0 + ) + dt_is_month_end = Map.register( + _dt_prop_map("is_month_end"), dtypes=np.bool_, axis=0 + ) dt_is_quarter_start = Map.register( - _dt_prop_map("is_quarter_start"), dtypes=np.bool_ + _dt_prop_map("is_quarter_start"), dtypes=np.bool_, axis=0 + ) + dt_is_quarter_end = Map.register( + _dt_prop_map("is_quarter_end"), dtypes=np.bool_, axis=0 + ) + dt_is_year_start = Map.register( + _dt_prop_map("is_year_start"), dtypes=np.bool_, axis=0 + ) + dt_is_year_end = Map.register(_dt_prop_map("is_year_end"), dtypes=np.bool_, axis=0) + dt_is_leap_year = Map.register( + _dt_prop_map("is_leap_year"), dtypes=np.bool_, axis=0 + ) + dt_daysinmonth = Map.register(_dt_prop_map("daysinmonth"), dtypes=np.int64, axis=0) + dt_days_in_month = Map.register( + _dt_prop_map("days_in_month"), dtypes=np.int64, axis=0 ) - dt_is_quarter_end = Map.register(_dt_prop_map("is_quarter_end"), dtypes=np.bool_) - dt_is_year_start = Map.register(_dt_prop_map("is_year_start"), dtypes=np.bool_) - dt_is_year_end = Map.register(_dt_prop_map("is_year_end"), dtypes=np.bool_) - dt_is_leap_year = Map.register(_dt_prop_map("is_leap_year"), dtypes=np.bool_) - dt_daysinmonth = Map.register(_dt_prop_map("daysinmonth"), dtypes=np.int64) - dt_days_in_month = Map.register(_dt_prop_map("days_in_month"), dtypes=np.int64) def dt_tz(self): def datetime_tz(df): @@ -1513,27 +1534,35 @@ def datetime_freq(df): return self.default_to_pandas(datetime_freq) - dt_to_period = Map.register(_dt_func_map("to_period")) - dt_to_pydatetime = Map.register(_dt_func_map("to_pydatetime"), dtypes=np.object_) - dt_tz_localize = Map.register(_dt_func_map("tz_localize")) - dt_tz_convert = Map.register(_dt_func_map("tz_convert")) - dt_normalize = Map.register(_dt_func_map("normalize")) - dt_strftime = Map.register(_dt_func_map("strftime"), dtypes=np.object_) - dt_round = Map.register(_dt_func_map("round")) - dt_floor = Map.register(_dt_func_map("floor")) - dt_ceil = Map.register(_dt_func_map("ceil")) - dt_month_name = Map.register(_dt_func_map("month_name"), dtypes=np.object_) - dt_day_name = Map.register(_dt_func_map("day_name"), dtypes=np.object_) - dt_to_pytimedelta = Map.register(_dt_func_map("to_pytimedelta"), dtypes=np.object_) - dt_total_seconds = Map.register(_dt_func_map("total_seconds"), dtypes=np.float64) - dt_seconds = Map.register(_dt_prop_map("seconds"), dtypes=np.int64) - dt_days = Map.register(_dt_prop_map("days"), dtypes=np.int64) - dt_microseconds = Map.register(_dt_prop_map("microseconds"), dtypes=np.int64) - dt_nanoseconds = Map.register(_dt_prop_map("nanoseconds"), dtypes=np.int64) - dt_qyear = Map.register(_dt_prop_map("qyear"), dtypes=np.int64) - dt_start_time = Map.register(_dt_prop_map("start_time")) - dt_end_time = Map.register(_dt_prop_map("end_time")) - dt_to_timestamp = Map.register(_dt_func_map("to_timestamp")) + dt_to_period = Map.register(_dt_func_map("to_period"), axis=0) + dt_to_pydatetime = Map.register( + _dt_func_map("to_pydatetime"), dtypes=np.object_, axis=0 + ) + dt_tz_localize = Map.register(_dt_func_map("tz_localize"), axis=0) + dt_tz_convert = Map.register(_dt_func_map("tz_convert"), axis=0) + dt_normalize = Map.register(_dt_func_map("normalize"), axis=0) + dt_strftime = Map.register(_dt_func_map("strftime"), dtypes=np.object_, axis=0) + dt_round = Map.register(_dt_func_map("round"), axis=0) + dt_floor = Map.register(_dt_func_map("floor"), axis=0) + dt_ceil = Map.register(_dt_func_map("ceil"), axis=0) + dt_month_name = Map.register(_dt_func_map("month_name"), dtypes=np.object_, axis=0) + dt_day_name = Map.register(_dt_func_map("day_name"), dtypes=np.object_, axis=0) + dt_to_pytimedelta = Map.register( + _dt_func_map("to_pytimedelta"), dtypes=np.object_, axis=0 + ) + dt_total_seconds = Map.register( + _dt_func_map("total_seconds"), dtypes=np.float64, axis=0 + ) + dt_seconds = Map.register(_dt_prop_map("seconds"), dtypes=np.int64, axis=0) + dt_days = Map.register(_dt_prop_map("days"), dtypes=np.int64, axis=0) + dt_microseconds = Map.register( + _dt_prop_map("microseconds"), dtypes=np.int64, axis=0 + ) + dt_nanoseconds = Map.register(_dt_prop_map("nanoseconds"), dtypes=np.int64, axis=0) + dt_qyear = Map.register(_dt_prop_map("qyear"), dtypes=np.int64, axis=0) + dt_start_time = Map.register(_dt_prop_map("start_time"), axis=0) + dt_end_time = Map.register(_dt_prop_map("end_time"), axis=0) + dt_to_timestamp = Map.register(_dt_func_map("to_timestamp"), axis=0) # END Dt map partitions operations @@ -1561,6 +1590,8 @@ def first_valid_index_builder(df): .to_pandas() .squeeze() ) + if np.isnan(first_result): + return None return self.index[first_result] def last_valid_index(self): @@ -1573,13 +1604,15 @@ def last_valid_index_builder(df): # We get the maximum from each column, then take the max of that to get # last_valid_index. The `to_pandas()` here is just for a single value and # `squeeze` will convert it to a scalar. - first_result = ( + last_result = ( self.__constructor__(self._modin_frame.reduce(0, last_valid_index_builder)) .max(axis=1) .to_pandas() .squeeze() ) - return self.index[first_result] + if np.isnan(last_result): + return None + return self.index[last_result] # END Column/Row partitions reduce operations @@ -1622,10 +1655,11 @@ def describe_builder(df, internal_indices=[]): return df.iloc[:, internal_indices].describe(**kwargs).reindex(new_index) return self.__constructor__( - self._modin_frame.apply_full_axis_select_indices( - 0, + self._modin_frame.map_select_indices( describe_builder, - empty_df.columns, + axis=0, + full_axis=True, + apply_indices=empty_df.columns, new_index=new_index, new_columns=empty_df.columns, ) @@ -1740,8 +1774,8 @@ def map_func(df): columns = self.columns index = columns.copy() transponed_self = self.transpose() - new_modin_frame = transponed_self._modin_frame.apply_full_axis( - 1, map_func, new_index=index, new_columns=columns + new_modin_frame = transponed_self._modin_frame.map( + map_func, axis=1, full_axis=True, new_index=index, new_columns=columns ) return transponed_self.__constructor__(new_modin_frame) @@ -1777,8 +1811,12 @@ def map_func(df, other=other, squeeze_self=squeeze_self): new_columns = [MODIN_UNNAMED_SERIES_LABEL] if num_cols == 1 else None axis = 1 - new_modin_frame = self._modin_frame.apply_full_axis( - axis, map_func, new_index=new_index, new_columns=new_columns + new_modin_frame = self._modin_frame.map( + map_func, + axis=axis, + full_axis=True, + new_index=new_index, + new_columns=new_columns, ) return self.__constructor__(new_modin_frame) @@ -1825,8 +1863,8 @@ def map_func(df, n=n, keep=keep, columns=columns): else: new_columns = self.columns - new_modin_frame = self._modin_frame.apply_full_axis( - axis=0, func=map_func, new_columns=new_columns + new_modin_frame = self._modin_frame.map( + func=map_func, axis=0, full_axis=True, new_columns=new_columns ) return self.__constructor__(new_modin_frame) @@ -1852,9 +1890,10 @@ def eval(self, expr, **kwargs): ) else: new_columns = empty_eval.columns - new_modin_frame = self._modin_frame.apply_full_axis( - 1, + new_modin_frame = self._modin_frame.map( lambda df: pandas.DataFrame(df.eval(expr, inplace=False, **kwargs)), + axis=1, + full_axis=True, new_index=self.index, new_columns=new_columns, ) @@ -1882,8 +1921,12 @@ def mode_builder(df): else: new_index = self.index new_columns = pandas.RangeIndex(len(self.columns)) - new_modin_frame = self._modin_frame.apply_full_axis( - axis, mode_builder, new_index=new_index, new_columns=new_columns + new_modin_frame = self._modin_frame.map( + mode_builder, + axis=axis, + full_axis=True, + new_index=new_index, + new_columns=new_columns, ) return self.__constructor__(new_modin_frame).dropna(axis=axis, how="all") @@ -1906,8 +1949,8 @@ def fillna_builder(series): # correct behavior. return series.squeeze(axis=1).fillna(value=value, **kwargs) - new_modin_frame = self._modin_frame.apply_full_axis( - 0, fillna_builder + new_modin_frame = self._modin_frame.map( + fillna_builder, axis=0, full_axis=True ) else: @@ -1939,8 +1982,10 @@ def fillna(df): def fillna_builder(df, r): return df.fillna(value=r, **kwargs) - new_modin_frame = self._modin_frame.broadcast_apply( - 0, fillna_builder, value._modin_frame + new_modin_frame = self._modin_frame.map( + fillna_builder, + axis=0, + other=value._modin_frame, ) return self.__constructor__(new_modin_frame) @@ -2010,9 +2055,10 @@ def quantile_builder(df, **kwargs): new_columns = pandas.Float64Index(q) else: q_index = pandas.Float64Index(q) - new_modin_frame = query_compiler._modin_frame.apply_full_axis( - axis, + new_modin_frame = query_compiler._modin_frame.map( lambda df: quantile_builder(df, **kwargs), + axis=axis, + full_axis=True, new_index=q_index, new_columns=new_columns, dtypes=np.float64, @@ -2029,9 +2075,10 @@ def query_builder(df, **modin_internal_kwargs): def rank(self, **kwargs): axis = kwargs.get("axis", 0) numeric_only = True if axis else kwargs.get("numeric_only", False) - new_modin_frame = self._modin_frame.apply_full_axis( - axis, + new_modin_frame = self._modin_frame.map( lambda df: df.rank(**kwargs), + axis=axis, + full_axis=True, new_index=self.index, new_columns=self.columns if not numeric_only else None, dtypes=np.float64, @@ -2066,14 +2113,15 @@ def sort_index(self, **kwargs): else: new_index = self.index.to_frame().sort_index(**kwargs).index new_columns = self.columns - new_modin_frame = self._modin_frame.apply_full_axis( - axis, + new_modin_frame = self._modin_frame.map( lambda df: df.sort_index( axis=axis, level=level, sort_remaining=sort_remaining, **kwargs ), - new_index, - new_columns, - dtypes="copy" if axis == 0 else None, + axis=axis, + full_axis=True, + new_index=new_index, + new_columns=new_columns, + copy_dtypes=axis == 0, ) return self.__constructor__(new_modin_frame) @@ -2149,11 +2197,12 @@ def applyier(df, internal_indices, other=[], internal_other_indices=[]): ) # we have no able to calculate correct indices here, so making it `dummy_index` - inconsistent_frame = self._modin_frame.broadcast_apply_select_indices( - axis=0, - apply_indices=value_vars, + inconsistent_frame = self._modin_frame.map_select_indices( func=applyier, + axis=0, other=to_broadcast, + full_axis=False, + apply_indices=value_vars, new_index=["dummy_index"] * len(id_vars), new_columns=["dummy_index"] * len(id_vars), ) @@ -2206,7 +2255,7 @@ def getitem_array(self, key): # ones are just of bool dtype if len(key.dtypes) == 1 and is_bool_dtype(key.dtypes[0]): self.__validate_bool_indexer(key.index) - return self.__getitem_bool(key, broadcast=True, dtypes="copy") + return self.__getitem_bool(key, broadcast=True, copy_dtypes=True) key = key.to_pandas().squeeze(axis=1) @@ -2314,26 +2363,17 @@ def setitem_builder(df, internal_indices=[]): idx = self.get_axis(axis ^ 1).get_indexer_for([key])[0] return self.insert_item(axis ^ 1, idx, value, how, replace=True) - # TODO: rework by passing list-like values to `apply_select_indices` + # TODO: rework by passing list-like values to # as an item to distribute - if is_list_like(value): - new_modin_frame = self._modin_frame.apply_full_axis_select_indices( - axis, - setitem_builder, - [key], - new_index=self.index, - new_columns=self.columns, - keep_remaining=True, - ) - else: - new_modin_frame = self._modin_frame.apply_select_indices( - axis, - setitem_builder, - [key], - new_index=self.index, - new_columns=self.columns, - keep_remaining=True, - ) + new_modin_frame = self._modin_frame.map_select_indices( + setitem_builder, + axis=axis, + full_axis=is_list_like(value), + apply_indices=[key], + new_index=self.index, + new_columns=self.columns, + keep_remaining=True, + ) return self.__constructor__(new_modin_frame) # END __getitem__ methods @@ -2393,11 +2433,12 @@ def insert(df, internal_indices=[]): df.insert(internal_idx, column, value) return df - # TODO: rework by passing list-like values to `apply_select_indices` + # TODO: rework by passing list-like values to # as an item to distribute - new_modin_frame = self._modin_frame.apply_full_axis_select_indices( - 0, + new_modin_frame = self._modin_frame.map_select_indices( insert, + axis=0, + full_axis=True, numeric_indices=[loc], keep_remaining=True, new_index=self.index, @@ -2433,14 +2474,14 @@ def apply_on_series(self, func, *args, **kwargs): assert self.is_series_like() - # We use apply_full_axis here instead of map since the latter assumes that the - # shape of the DataFrame does not change. However, it is possible for functions - # applied to Series objects to end up creating DataFrames. It is possible that - # using apply_full_axis is much less performant compared to using a variant of - # map. + # We full_axis=True here it is possible for functions applied to Series objects to end up + # creating DataFrames, changing the result's shape. It is possible that + # using full_axis=True is much less performant compared to full_axis=False. return self.__constructor__( - self._modin_frame.apply_full_axis( - 1, lambda df: df.squeeze(axis=1).apply(func, *args, **kwargs) + self._modin_frame.map( + lambda df: df.squeeze(axis=1).apply(func, *args, **kwargs), + axis=1, + full_axis=True, ) ) @@ -2475,8 +2516,12 @@ def dict_apply_builder(df, func_dict={}): func = {k: wrap_udf_function(v) if callable(v) else v for k, v in func.items()} return self.__constructor__( - self._modin_frame.apply_full_axis_select_indices( - axis, dict_apply_builder, func, keep_remaining=False + self._modin_frame.map_select_indices( + dict_apply_builder, + axis=axis, + full_axis=True, + apply_indices=func, + keep_remaining=False, ) ) @@ -2513,9 +2558,10 @@ def _list_like_func(self, func, axis, *args, **kwargs): else self.columns ) func = [wrap_udf_function(f) if callable(f) else f for f in func] - new_modin_frame = self._modin_frame.apply_full_axis( - axis, + new_modin_frame = self._modin_frame.map( lambda df: pandas.DataFrame(df.apply(func, axis, *args, **kwargs)), + axis=axis, + full_axis=True, new_index=new_index, new_columns=new_columns, ) @@ -2546,8 +2592,10 @@ def _callable_func(self, func, axis, *args, **kwargs): if callable(func): func = wrap_udf_function(func) - new_modin_frame = self._modin_frame.apply_full_axis( - axis, lambda df: df.apply(func, axis=axis, *args, **kwargs) + new_modin_frame = self._modin_frame.map( + lambda df: df.apply(func, axis=axis, *args, **kwargs), + axis=axis, + full_axis=True, ) return self.__constructor__(new_modin_frame) @@ -2982,11 +3030,12 @@ def compute_groupby(df, drop=False, partition_idx=0): apply_indices = list(agg_func.keys()) if isinstance(agg_func, dict) else None - new_modin_frame = self._modin_frame.broadcast_apply_full_axis( - axis=axis, + new_modin_frame = self._modin_frame.map( func=lambda df, by=None, partition_idx=None: groupby_agg_builder( df, by, drop, partition_idx ), + axis=axis, + full_axis=True, other=broadcastable_by, apply_indices=apply_indices, enumerate_partitions=True, @@ -3049,9 +3098,10 @@ def __convert_by(by): obj.index = index reindexed = self.__constructor__( - obj._modin_frame.apply_full_axis( - 1, + obj._modin_frame.map( lambda df: df.set_index(to_reindex, append=(len(to_reindex) == 1)), + axis=1, + full_axis=True, new_columns=obj.columns.drop(to_reindex), ) ) @@ -3141,8 +3191,8 @@ def applyier(df, other): return result result = self.__constructor__( - to_group._modin_frame.broadcast_apply_full_axis( - axis=0, func=applyier, other=keys_columns._modin_frame + to_group._modin_frame.map( + func=applyier, axis=0, full_axis=True, other=keys_columns._modin_frame ) ) @@ -3177,15 +3227,21 @@ def get_dummies(self, columns, **kwargs): # efficient if we are mapping over all of the data to do it this way # than it would be to reuse the code for specific columns. if len(columns) == len(self.columns): - new_modin_frame = self._modin_frame.apply_full_axis( - 0, lambda df: pandas.get_dummies(df, **kwargs), new_index=self.index + new_modin_frame = self._modin_frame.map( + lambda df: pandas.get_dummies(df, **kwargs), + axis=0, + full_axis=True, + new_index=self.index, ) untouched_frame = None else: new_modin_frame = self._modin_frame.take_2d_labels_or_positional( col_labels=columns - ).apply_full_axis( - 0, lambda df: pandas.get_dummies(df, **kwargs), new_index=self.index + ).map( + lambda df: pandas.get_dummies(df, **kwargs), + axis=0, + full_axis=True, + new_index=self.index, ) untouched_frame = self.drop(columns=columns) # If we mapped over all the data we are done. If not, we need to @@ -3234,14 +3290,12 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item): partition.iloc[row_internal_indices, col_internal_indices] = item return partition - new_modin_frame = self._modin_frame.apply_select_indices( - axis=None, + new_modin_frame = self._modin_frame.map_select_indices_both_axes( func=iloc_mut, row_labels=row_numeric_index, col_labels=col_numeric_index, new_index=self.index, new_columns=self.columns, - keep_remaining=True, item_to_distribute=broadcasted_items, ) return self.__constructor__(new_modin_frame) @@ -3307,16 +3361,17 @@ def func(df) -> np.ndarray: ser = df.iloc[:, 0] return ser.cat.codes - res = self._modin_frame.apply_full_axis(axis=0, func=func) + res = self._modin_frame.map(func=func, axis=0, full_axis=True) return self.__constructor__(res) # END Cat operations def compare(self, other, **kwargs): return self.__constructor__( - self._modin_frame.broadcast_apply_full_axis( - 0, + self._modin_frame.map( lambda l, r: pandas_compare(l, other=r, **kwargs), - other._modin_frame, + axis=0, + full_axis=True, + other=other._modin_frame, ) ) diff --git a/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py index 570397fd517..d2aeea42df8 100644 --- a/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -238,8 +238,13 @@ def func(df, **kw): df.to_pickle(**kwargs) return pandas.DataFrame() - result = qc._modin_frame.broadcast_apply_full_axis( - 1, func, other=None, new_index=[], new_columns=[], enumerate_partitions=True + result = qc._modin_frame.map( + func, + axis=1, + full_axis=True, + new_index=[], + new_columns=[], + enumerate_partitions=True, ) result.to_pandas() diff --git a/modin/pandas/test/dataframe/algebra/test_map.py b/modin/pandas/test/dataframe/algebra/test_map.py new file mode 100644 index 00000000000..9bdf751c0a4 --- /dev/null +++ b/modin/pandas/test/dataframe/algebra/test_map.py @@ -0,0 +1,47 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +import pytest +import numpy as np +import pandas +import modin.pandas as pd + +from modin.pandas.test.utils import ( + arg_keys, + bool_arg_keys, + bool_arg_values, +) +from modin.config import NPartitions + +NPartitions.put(4) + + +@pytest.mark.parametrize( + "hint_dtypes", bool_arg_values, ids=arg_keys("hint_dtypes", bool_arg_keys) +) +def test_map_elementwise(hint_dtypes): + modin_df = pd.DataFrame({"a": range(33), "b": range(33, 66)}) + modin_internal_df = modin_df._query_compiler._modin_frame + # dtypes optional argument specifies expected dtype to avoid recomputing after + dtypes = pandas.Series({"a": np.bool, "b": np.bool}) if hint_dtypes else None + result = modin_internal_df.map(lambda df: df < 32, dtypes=dtypes) + result_as_pandas = result.to_pandas() + expected = pandas.DataFrame({"a": [True] * 32 + [False], "b": [False] * 33}) + assert result_as_pandas.equals(expected) + assert result.dtypes.equals(expected.dtypes) + # The result of the following operation should be the same, even though + # an `axis` argument is provided + result = modin_internal_df.map(lambda df: df < 32, dtypes=dtypes, axis=0) + result_as_pandas = result.to_pandas() + assert result_as_pandas.equals(expected) + assert result.dtypes.equals(expected.dtypes) diff --git a/modin/pandas/test/dataframe/algebra/test_reduce.py b/modin/pandas/test/dataframe/algebra/test_reduce.py new file mode 100644 index 00000000000..c5abdd8c1a4 --- /dev/null +++ b/modin/pandas/test/dataframe/algebra/test_reduce.py @@ -0,0 +1,67 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Tests the reduce and tree_reduce Modin algebra operators.""" + + +import pytest +import numpy as np +import pandas +import modin.pandas as pd + +from modin.pandas.test.utils import ( + arg_keys, + bool_arg_keys, + bool_arg_values, +) +from modin.config import NPartitions +from modin.utils import MODIN_UNNAMED_SERIES_LABEL + +NPartitions.put(4) + + +@pytest.mark.parametrize( + "hint_dtypes", bool_arg_values, ids=arg_keys("hint_dtypes", bool_arg_keys) +) +def test_reduce_small(hint_dtypes): + modin_df = pd.DataFrame({"a": range(33), "b": range(33, 66)}) + modin_internal_df = modin_df._query_compiler._modin_frame + # dtypes optional argument specifies expected dtype to avoid recomputing after + dtypes = pandas.Series({"a": np.float64, "b": np.float64}) if hint_dtypes else None + result = modin_internal_df.reduce(0, pandas.DataFrame.mean, dtypes=dtypes) + result_as_pandas = result.to_pandas() + expected = pandas.DataFrame( + {"a": [16.0], "b": [49.0]}, index=[MODIN_UNNAMED_SERIES_LABEL] + ) + assert result_as_pandas.equals(expected) + assert result.dtypes.equals(expected.dtypes) + + +@pytest.mark.parametrize( + "hint_dtypes", bool_arg_values, ids=arg_keys("hint_dtypes", bool_arg_keys) +) +def test_tree_reduce_small(hint_dtypes): + data = {"a": (["a"] * 5) + ([np.nan] * 6), "b": ([np.nan] * 3) + (["c"] * 8)} + modin_df = pd.DataFrame(data) + modin_internal_df = modin_df._query_compiler._modin_frame + # dtypes optional argument specifies expected dtype to avoid recomputing after + dtypes = pandas.Series({"a": np.int64, "b": np.int64}) if hint_dtypes else None + result = modin_internal_df.tree_reduce( + 0, pandas.DataFrame.count, pandas.DataFrame.sum, dtypes=dtypes + ) + result_as_pandas = result.to_pandas() + expected = pandas.DataFrame( + {"a": [5], "b": [8]}, index=[MODIN_UNNAMED_SERIES_LABEL] + ) + assert result_as_pandas.equals(expected) + assert result.dtypes.equals(expected.dtypes) diff --git a/modin/pandas/test/dataframe/test_indexing.py b/modin/pandas/test/dataframe/test_indexing.py index 6ff03ce684d..9f856b1ef26 100644 --- a/modin/pandas/test/dataframe/test_indexing.py +++ b/modin/pandas/test/dataframe/test_indexing.py @@ -175,8 +175,30 @@ def compare_asof( def test_first_valid_index(data): modin_df = pd.DataFrame(data) pandas_df = pandas.DataFrame(data) + assert modin_df.first_valid_index() == pandas_df.first_valid_index() - assert modin_df.first_valid_index() == (pandas_df.first_valid_index()) + +@pytest.mark.parametrize("none_value", [None, np.nan], ids=["None", "np.nan"]) +def test_first_valid_index_none(none_value): + data = {0: [none_value] * 100} + modin_df = pd.DataFrame(data) + pandas_df = pd.DataFrame(data) + assert modin_df.first_valid_index() == pandas_df.first_valid_index() + + +@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) +def test_last_valid_index(data): + modin_df = pd.DataFrame(data) + pandas_df = pandas.DataFrame(data) + assert modin_df.last_valid_index() == pandas_df.last_valid_index() + + +@pytest.mark.parametrize("none_value", [None, np.nan], ids=["None", "np.nan"]) +def test_last_valid_index_none(none_value): + data = {0: [none_value] * 100} + modin_df = pd.DataFrame(data) + pandas_df = pd.DataFrame(data) + assert modin_df.last_valid_index() == pandas_df.last_valid_index() @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 92c91027cdd..4c9f2b7b6f5 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -109,7 +109,7 @@ def test_aligning_partitions(): @pytest.mark.parametrize("has_frame_shape_cache", [True, False]) def test_apply_func_to_both_axis(has_partitions_shape_cache, has_frame_shape_cache): """ - Test ``modin.core.dataframe.pandas.dataframe.dataframe.PandasDataframe.apply_select_indices`` functionality of broadcasting non-distributed items. + Test ``modin.core.dataframe.pandas.dataframe.dataframe.PandasDataframe.map_select_indices_both_axes`` functionality of broadcasting non-distributed items. """ data = test_data_values[0] @@ -144,13 +144,11 @@ def func_to_apply(partition, row_internal_indices, col_internal_indices, item): partition.iloc[row_internal_indices, col_internal_indices] = item return partition - new_modin_frame = modin_frame.apply_select_indices( - axis=None, + new_modin_frame = modin_frame.map_select_indices_both_axes( func=func_to_apply, # Passing none-slices does not trigger shapes recomputation and so the cache is untouched. row_labels=slice(None), col_labels=slice(None), - keep_remaining=True, new_index=pd_df.index, new_columns=pd_df.columns, item_to_distribute=values,