Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove level and errors arguments for Pandas 2 compatibility. #28375

Merged
merged 23 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ def sort_index(self, axis, **kwargs):
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def where(self, cond, other, errors, **kwargs):
def where(self, cond, other, **kwargs):
caneff marked this conversation as resolved.
Show resolved Hide resolved
"""where is not parallelizable when ``errors="ignore"`` is specified."""
requires = partitionings.Arbitrary()
deferred_args = {}
Expand All @@ -937,16 +937,19 @@ def where(self, cond, other, errors, **kwargs):
else:
actual_args['other'] = other

if errors == "ignore":
# We need all data in order to ignore errors and propagate the original
# data.
requires = partitionings.Singleton(
reason=(
f"where(errors={errors!r}) is currently not parallelizable, "
"because all data must be collected on one node to determine if "
"the original data should be propagated instead."))
# For Pandas 2.0, errors was removed as an argument.
if PD_VERSION < (2, 0):
if "errors" in kwargs and kwargs['errors'] == "ignore":
# We need all data in order to ignore errors and propagate the original
# data.
requires = partitionings.Singleton(
reason=(
f"where(errors={kwargs['errors']!r}) is currently not "
"parallelizable, because all data must be collected on one "
"node to determine if the original data should be propagated "
"instead."))

actual_args['errors'] = errors
actual_args['errors'] = kwargs['errors'] if 'errors' in kwargs else None

def where_execution(df, *args):
runtime_values = {
Expand Down Expand Up @@ -1605,12 +1608,7 @@ def mean(self, skipna, **kwargs):
@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def var(self, axis, skipna, level, ddof, **kwargs):
"""Per-level aggregation is not yet supported
(https://github.com/apache/beam/issues/21829). Only the default,
``level=None``, is allowed."""
if level is not None:
raise NotImplementedError("per-level aggregation")
def var(self, axis, skipna, ddof, **kwargs):
if skipna is None or skipna:
self = self.dropna() # pylint: disable=self-cls-assignment

Expand Down Expand Up @@ -1680,9 +1678,7 @@ def corr(self, other, method, min_periods):
@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def skew(self, axis, skipna, level, numeric_only, **kwargs):
if level is not None:
raise NotImplementedError("per-level aggregation")
def skew(self, axis, skipna, numeric_only, **kwargs):
if skipna is None or skipna:
self = self.dropna() # pylint: disable=self-cls-assignment
# See the online, numerically stable formulae at
Expand Down Expand Up @@ -1744,9 +1740,7 @@ def combine_moments(data):
@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def kurtosis(self, axis, skipna, level, numeric_only, **kwargs):
if level is not None:
raise NotImplementedError("per-level aggregation")
def kurtosis(self, axis, skipna, numeric_only, **kwargs):
if skipna is None or skipna:
self = self.dropna() # pylint: disable=self-cls-assignment

Expand Down Expand Up @@ -2576,7 +2570,8 @@ def align(self, other, join, axis, copy, level, method, **kwargs):
if kwargs:
raise NotImplementedError('align(%s)' % ', '.join(kwargs.keys()))

if level is not None:
# In Pandas 2.0, all aggregations lost the level keyword.
if PD_VERSION < (2, 0) and level is not None:
# Could probably get by partitioning on the used levels.
requires_partition_by = partitionings.Singleton(reason=(
f"align(level={level}) is not currently parallelizable. Only "
Expand Down Expand Up @@ -5388,6 +5383,7 @@ def func(df, *args, **kwargs):
name,
frame_base._elementwise_method(name, restrictions={'level': None},
base=pd.Series))

if hasattr(pd.DataFrame, name):
setattr(
DeferredDataFrame,
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2042,6 +2042,7 @@ def test_dataframe_agg_modes(self):
self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)

@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_series_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.count(level=0),
Expand All @@ -2065,6 +2066,7 @@ def test_series_agg_level(self):
lambda df: df.set_index(['group', 'foo']).bar.median(level=1),
GROUPBY_DF)

@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_dataframe_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)
Expand Down Expand Up @@ -2232,6 +2234,7 @@ def test_df_agg_method_invalid_kwarg_raises(self):
self._run_error_test(
lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)

@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_agg_min_count(self):
df = pd.DataFrame({
'good': [1, 2, 3, np.nan],
Expand Down