Skip to content

Commit

Permalink
REF: Compute correct result_index upfront in groupby
Browse files Browse the repository at this point in the history
  • Loading branch information
rhshadrach committed Oct 27, 2023
1 parent c36e302 commit e32b789
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 360 deletions.
24 changes: 16 additions & 8 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ def _wrap_applied_output(
# GH #823 #24880
index = self.grouper.result_index
res_df = self.obj._constructor_expanddim(values, index=index)
res_df = self._reindex_output(res_df)
# if self.observed is False,
# keep all-NaN rows created while re-indexing
res_ser = res_df.stack(future_stack=True)
Expand All @@ -442,7 +441,7 @@ def _wrap_applied_output(
if not self.as_index:
result = self._insert_inaxis_grouper(result)
result.index = default_index(len(result))
return self._reindex_output(result)
return result

def _aggregate_named(self, func, *args, **kwargs):
# Note: this is very similar to _aggregate_series_pure_python,
Expand Down Expand Up @@ -672,7 +671,7 @@ def nunique(self, dropna: bool = True) -> Series | DataFrame:
2023-02-01 1
Freq: MS, dtype: int64
"""
ids, _, _ = self.grouper.group_info
ids, _ = self.grouper.group_info

val = self.obj._values

Expand Down Expand Up @@ -721,7 +720,7 @@ def nunique(self, dropna: bool = True) -> Series | DataFrame:
if not self.as_index:
result = self._insert_inaxis_grouper(result)
result.index = default_index(len(result))
return self._reindex_output(result, fill_value=0)
return result

@doc(Series.describe)
def describe(self, percentiles=None, include=None, exclude=None) -> Series:
Expand Down Expand Up @@ -749,7 +748,7 @@ def value_counts(
from pandas.core.reshape.merge import get_join_indexers
from pandas.core.reshape.tile import cut

ids, _, _ = self.grouper.group_info
ids, _ = self.grouper.group_info
val = self.obj._values

index_names = self.grouper.names + [self.obj.name]
Expand Down Expand Up @@ -819,9 +818,18 @@ def value_counts(
rep = partial(np.repeat, repeats=np.add.reduceat(inc, idx))

# multi-index components
codes = self.grouper.reconstructed_codes
if isinstance(self.grouper.result_index, MultiIndex):
codes = list(self.grouper.result_index.codes)
else:
codes = [
algorithms.factorize(
self.grouper.result_index,
sort=self.grouper._sort,
use_na_sentinel=self.grouper.dropna,
)[0]
]
codes = [rep(level_codes) for level_codes in codes] + [llab(lab, inc)]
levels = [ping.group_index for ping in self.grouper.groupings] + [lev]
levels = self.grouper.levels + [lev]

if dropna:
mask = codes[-1] != -1
Expand Down Expand Up @@ -1686,7 +1694,7 @@ def _wrap_applied_output_series(
if not self.as_index:
result = self._insert_inaxis_grouper(result)

return self._reindex_output(result)
return result

def _cython_transform(
self,
Expand Down
160 changes: 24 additions & 136 deletions pandas/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class providing the base-class of operations.
NDFrameT,
PositionalIndexer,
RandomState,
Scalar,
T,
npt,
)
Expand Down Expand Up @@ -788,7 +787,7 @@ def __repr__(self) -> str:

@final
@property
def groups(self) -> dict[Hashable, np.ndarray]:
def groups(self) -> dict[Hashable, Index]:
"""
Dict {group name -> group labels}.
Expand Down Expand Up @@ -1505,7 +1504,7 @@ def _set_result_index_ordered(
return result

# row order is scrambled => sort the rows by position in original index
original_positions = Index(self.grouper.result_ilocs())
original_positions = Index(self.grouper.result_ilocs)
result = result.set_axis(original_positions, axis=self.axis, copy=False)
result = result.sort_index(axis=self.axis)
if self.grouper.has_dropped_na:
Expand Down Expand Up @@ -1599,7 +1598,7 @@ def _wrap_aggregated_output(
# error: Argument 1 to "_maybe_transpose_result" of "GroupBy" has
# incompatible type "Union[Series, DataFrame]"; expected "NDFrameT"
res = self._maybe_transpose_result(result) # type: ignore[arg-type]
return self._reindex_output(res, qs=qs)
return res

def _wrap_applied_output(
self,
Expand All @@ -1615,8 +1614,8 @@ def _wrap_applied_output(

@final
def _numba_prep(self, data: DataFrame):
ids, _, ngroups = self.grouper.group_info
sorted_index = self.grouper._sort_idx
ids, ngroups = self.grouper.group_info
sorted_index = self.grouper.result_ilocs
sorted_ids = self.grouper._sorted_ids

sorted_data = data.take(sorted_index, axis=self.axis).to_numpy()
Expand Down Expand Up @@ -1669,7 +1668,7 @@ def _numba_agg_general(
)
# Pass group ids to kernel directly if it can handle it
# (This is faster since it doesn't require a sort)
ids, _, _ = self.grouper.group_info
ids, _ = self.grouper.group_info
ngroups = self.grouper.ngroups

res_mgr = df._mgr.apply(
Expand Down Expand Up @@ -2043,7 +2042,7 @@ def _wrap_transform_fast_result(self, result: NDFrameT) -> NDFrameT:
obj = self._obj_with_exclusions

# for each col, reshape to size of original frame by take operation
ids, _, _ = self.grouper.group_info
ids = self.grouper.result_index_and_codes[1]
result = result.reindex(self.grouper.result_index, axis=self.axis, copy=False)

if self.obj.ndim == 1:
Expand Down Expand Up @@ -2096,7 +2095,7 @@ def _cumcount_array(self, ascending: bool = True) -> np.ndarray:
this is currently implementing sort=False
(though the default is sort=True) for groupby in general
"""
ids, _, ngroups = self.grouper.group_info
ids, ngroups = self.grouper.group_info
sorter = get_group_index_sorter(ids, ngroups)
ids, count = ids[sorter], len(ids)

Expand Down Expand Up @@ -2305,7 +2304,7 @@ def count(self) -> NDFrameT:
Freq: MS, dtype: int64
"""
data = self._get_data_to_aggregate()
ids, _, ngroups = self.grouper.group_info
ids, ngroups = self.grouper.group_info
mask = ids != -1

is_series = data.ndim == 1
Expand Down Expand Up @@ -2335,15 +2334,9 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:

new_mgr = data.grouped_reduce(hfunc)
new_obj = self._wrap_agged_manager(new_mgr)
result = self._wrap_aggregated_output(new_obj)

# If we are grouping on categoricals we want unobserved categories to
# return zero, rather than the default of NaN which the reindexing in
# _wrap_aggregated_output() returns. GH 35028
# e.g. test_dataframe_groupby_on_2_categoricals_when_observed_is_false
with com.temp_setattr(self, "observed", True):
result = self._wrap_aggregated_output(new_obj)

return self._reindex_output(result, fill_value=0)
return result

@final
@Substitution(name="groupby")
Expand Down Expand Up @@ -2820,7 +2813,7 @@ def _value_counts(
and not grouping._observed
for grouping in groupings
):
levels_list = [ping.result_index for ping in groupings]
levels_list = gb.grouper.levels
multi_index, _ = MultiIndex.from_product(
levels_list, names=[ping.name for ping in groupings]
).sortlevel()
Expand Down Expand Up @@ -3043,10 +3036,6 @@ def size(self) -> DataFrame | Series:
dtype_backend=dtype_backend,
)

with com.temp_setattr(self, "as_index", True):
# size already has the desired behavior in GH#49519, but this makes the
# as_index=False path of _reindex_output fail on categorical groupers.
result = self._reindex_output(result, fill_value=0)
if not self.as_index:
# error: Incompatible types in assignment (expression has
# type "DataFrame", variable has type "Series")
Expand Down Expand Up @@ -3124,7 +3113,7 @@ def sum(
npfunc=np.sum,
)

return self._reindex_output(result, fill_value=0)
return result

@final
@doc(
Expand Down Expand Up @@ -3522,7 +3511,7 @@ def ohlc(self) -> DataFrame:
result = self.obj._constructor_expanddim(
res_values, index=self.grouper.result_index, columns=agg_names
)
return self._reindex_output(result)
return result

result = self._apply_to_column_groupbys(lambda sgb: sgb.ohlc())
return result
Expand Down Expand Up @@ -3907,7 +3896,7 @@ def _fill(self, direction: Literal["ffill", "bfill"], limit: int | None = None):
if limit is None:
limit = -1

ids, _, _ = self.grouper.group_info
ids, _ = self.grouper.group_info
sorted_labels = np.argsort(ids, kind="mergesort").astype(np.intp, copy=False)
if direction == "bfill":
sorted_labels = sorted_labels[::-1]
Expand Down Expand Up @@ -4238,7 +4227,7 @@ def _nth(
if not dropna:
mask = self._make_mask_from_positional_indexer(n)

ids, _, _ = self.grouper.group_info
ids, _ = self.grouper.group_info

# Drop NA values in grouping
mask = mask & (ids != -1)
Expand Down Expand Up @@ -4449,12 +4438,13 @@ def post_processor(
qs = np.array([q], dtype=np.float64)
pass_qs = None

ids, _, ngroups = self.grouper.group_info
ids, ngroups = self.grouper.group_info
ids = ids[ids >= 0]
nqs = len(qs)

func = partial(
libgroupby.group_quantile,
labels=ids,
labels=ids[ids >= 0],
qs=qs,
interpolation=interpolation,
starts=starts,
Expand Down Expand Up @@ -5169,7 +5159,7 @@ def shift(
else:
if fill_value is lib.no_default:
fill_value = None
ids, _, ngroups = self.grouper.group_info
ids, ngroups = self.grouper.group_info
res_indexer = np.zeros(len(ids), dtype=np.int64)

libgroupby.group_shift_indexer(res_indexer, ids, ngroups, period)
Expand Down Expand Up @@ -5516,104 +5506,6 @@ def _mask_selected_obj(self, mask: npt.NDArray[np.bool_]) -> NDFrameT:
else:
return self._selected_obj.iloc[:, mask]

@final
def _reindex_output(
self,
output: OutputFrameOrSeries,
fill_value: Scalar = np.nan,
qs: npt.NDArray[np.float64] | None = None,
) -> OutputFrameOrSeries:
"""
If we have categorical groupers, then we might want to make sure that
we have a fully re-indexed output to the levels. This means expanding
the output space to accommodate all values in the cartesian product of
our groups, regardless of whether they were observed in the data or
not. This will expand the output space if there are missing groups.
The method returns early without modifying the input if the number of
groupings is less than 2, self.observed == True or none of the groupers
are categorical.
Parameters
----------
output : Series or DataFrame
Object resulting from grouping and applying an operation.
fill_value : scalar, default np.nan
Value to use for unobserved categories if self.observed is False.
qs : np.ndarray[float64] or None, default None
quantile values, only relevant for quantile.
Returns
-------
Series or DataFrame
Object (potentially) re-indexed to include all possible groups.
"""
groupings = self.grouper.groupings
if len(groupings) == 1:
return output

# if we only care about the observed values
# we are done
elif self.observed:
return output

# reindexing only applies to a Categorical grouper
elif not any(
isinstance(ping.grouping_vector, (Categorical, CategoricalIndex))
for ping in groupings
):
return output

levels_list = [ping.group_index for ping in groupings]
names = self.grouper.names
if qs is not None:
# error: Argument 1 to "append" of "list" has incompatible type
# "ndarray[Any, dtype[floating[_64Bit]]]"; expected "Index"
levels_list.append(qs) # type: ignore[arg-type]
names = names + [None]
index = MultiIndex.from_product(levels_list, names=names)
if self.sort:
index = index.sort_values()

if self.as_index:
# Always holds for SeriesGroupBy unless GH#36507 is implemented
d = {
self.obj._get_axis_name(self.axis): index,
"copy": False,
"fill_value": fill_value,
}
return output.reindex(**d) # type: ignore[arg-type]

# GH 13204
# Here, the categorical in-axis groupers, which need to be fully
# expanded, are columns in `output`. An idea is to do:
# output = output.set_index(self.grouper.names)
# .reindex(index).reset_index()
# but special care has to be taken because of possible not-in-axis
# groupers.
# So, we manually select and drop the in-axis grouper columns,
# reindex `output`, and then reset the in-axis grouper columns.

# Select in-axis groupers
in_axis_grps = [
(i, ping.name) for (i, ping) in enumerate(groupings) if ping.in_axis
]
if len(in_axis_grps) > 0:
g_nums, g_names = zip(*in_axis_grps)
output = output.drop(labels=list(g_names), axis=1)

# Set a temp index and reindex (possibly expanding)
output = output.set_index(self.grouper.result_index).reindex(
index, copy=False, fill_value=fill_value
)

# Reset in-axis grouper columns
# (using level numbers `g_nums` because level names may not be unique)
if len(in_axis_grps) > 0:
output = output.reset_index(level=g_nums)

return output.reset_index(drop=True)

@final
def sample(
self,
Expand Down Expand Up @@ -5785,14 +5677,10 @@ def _idxmax_idxmin(
if not self.observed and any(
ping._passed_categorical for ping in self.grouper.groupings
):
expected_len = np.prod(
[len(ping.group_index) for ping in self.grouper.groupings]
)
if len(self.grouper.groupings) == 1:
result_len = len(self.grouper.groupings[0].grouping_vector.unique())
else:
# result_index only contains observed groups in this case
result_len = len(self.grouper.result_index)
expected_len = len(self.grouper.result_index)
# TODO: Better way to find # of observed groups?
group_sizes = self.grouper.size()
result_len = group_sizes[group_sizes > 0].shape[0]
assert result_len <= expected_len
has_unobserved = result_len < expected_len

Expand Down
Loading

0 comments on commit e32b789

Please sign in to comment.