Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove level and errors arguments for Pandas 2 compatibility. #28375

Merged
merged 23 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def wrapper(self, inplace=False, **kwargs):
return wrapper


def args_to_kwargs(base_type):
def args_to_kwargs(base_type, removed_method=False, removed_args=None):
"""Convert all args to kwargs before calling the decorated function.

When applied to a function, this decorator creates a new function
Expand All @@ -484,18 +484,47 @@ def args_to_kwargs(base_type):
determine the name to use for arguments that are converted to keyword
arguments.

For internal use only. No backwards compatibility guarantees."""
For internal use only. No backwards compatibility guarantees.

Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
arg_names = getfullargspec(unwrap(getattr(base_type, func.__name__))).args
if removed_method:
# Do no processing, let Beam function itself raise the error if called.
return func

removed_arg_names = removed_args if removed_args is not None else []

base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
base_arg_names = base_arg_spec.args
# Some arguments are keyword only and we still want to check against those.
all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs
beam_arg_names = getfullargspec(func).args

if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) -
set(removed_arg_names)):
raise TypeError(
f"Beam definition of {func.__name__} has arguments that are not found"
f" in the base version of the function: {not_found}")

@functools.wraps(func)
def wrapper(*args, **kwargs):
for name, value in zip(arg_names, args):
for name, value in zip(base_arg_names, args):
if name in kwargs:
raise TypeError(
"%s() got multiple values for argument '%s'" %
(func.__name__, name))
kwargs[name] = value
# Still have to populate these for the Beam function signature.
if removed_args:
for name in removed_args:
if not name in kwargs:
kwargs[name] = None
return func(**kwargs)

return wrapper
Expand Down Expand Up @@ -524,14 +553,22 @@ def wrapper(*args, **kwargs):
f"**{BEAM_SPECIFIC!r}** for details.")


def with_docs_from(base_type, name=None):
def with_docs_from(base_type, name=None, removed_method=False):
"""Decorator that updates the documentation from the wrapped function to
duplicate the documentation from the identically-named method in `base_type`.

Any docstring on the original function will be included in the new function
under a "Differences from pandas" heading.

removed_method used in cases where a method has been removed in a later
version of Pandas.
"""
def wrap(func):
if removed_method:
func.__doc__ = (
"This method has been removed in the current version of Pandas.")
return func

fn_name = name or func.__name__
orig_doc = getattr(base_type, fn_name).__doc__
if orig_doc is None:
Expand Down Expand Up @@ -588,15 +625,26 @@ def format_section(header):
return wrap


def populate_defaults(base_type):
def populate_defaults(base_type, removed_method=False, removed_args=None):
"""Populate default values for keyword arguments in decorated function.

When applied to a function, this decorator creates a new function
with default values for all keyword arguments, based on the default values
for the identically-named method on `base_type`.

For internal use only. No backwards compatibility guarantees."""
For internal use only. No backwards compatibility guarantees.

Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
if removed_method:
return func

base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
if not base_argspec.defaults:
return func
Expand All @@ -613,6 +661,8 @@ def wrap(func):
defaults_to_populate = set(
func_argspec.args[:num_non_defaults]).intersection(
arg_to_default.keys())
if removed_args:
defaults_to_populate -= set(removed_args)

@functools.wraps(func)
def wrapper(**kwargs):
Expand Down
23 changes: 22 additions & 1 deletion sdks/python/apache_beam/dataframe/frame_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,42 @@ class Base(object):
def func(self, a=1, b=2, c=3):
pass

def func_removed_args(self, a):
pass

class Proxy(object):
@frame_base.args_to_kwargs(Base)
@frame_base.populate_defaults(Base)
def func(self, a, c=1000, **kwargs):
return dict(kwargs, a=a, c=c)

@frame_base.args_to_kwargs(Base, removed_method=True)
@frame_base.populate_defaults(Base, removed_method=True)
def func_removed_method(self, a, **kwargs):
return dict(kwargs, a=a)

@frame_base.args_to_kwargs(Base, removed_args=['c'])
@frame_base.populate_defaults(Base, removed_args=['c'])
def func_removed_args(self, a, c, **kwargs):
return dict(kwargs, a=a)

proxy = Proxy()
# pylint: disable=too-many-function-args
# pylint: disable=too-many-function-args,no-value-for-parameter
self.assertEqual(proxy.func(), {'a': 1, 'c': 1000})
self.assertEqual(proxy.func(100), {'a': 100, 'c': 1000})
self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6})
self.assertEqual(proxy.func(2, c=6), {'a': 2, 'c': 6})
self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6})
self.assertEqual(proxy.func(c=6), {'a': 1, 'c': 6})

with self.assertRaises(TypeError): # missing 1 required positional argument
proxy.func_removed_method()
self.assertEqual(proxy.func_removed_method(12, c=100), {'a': 12, 'c': 100})

with self.assertRaises(TypeError): # missing 1 required positional argument
proxy.func_removed_args()
self.assertEqual(proxy.func_removed_args(12, d=100), {'a': 12, 'd': 100})


if __name__ == '__main__':
unittest.main()
75 changes: 42 additions & 33 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,10 @@ def sort_index(self, axis, **kwargs):
))

@frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.args_to_kwargs(
pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2, 0) else None)
@frame_base.populate_defaults(
pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2, 0) else None)
@frame_base.maybe_inplace
def where(self, cond, other, errors, **kwargs):
"""where is not parallelizable when ``errors="ignore"`` is specified."""
Expand All @@ -937,16 +939,19 @@ def where(self, cond, other, errors, **kwargs):
else:
actual_args['other'] = other

if errors == "ignore":
# We need all data in order to ignore errors and propagate the original
# data.
requires = partitionings.Singleton(
reason=(
f"where(errors={errors!r}) is currently not parallelizable, "
"because all data must be collected on one node to determine if "
"the original data should be propagated instead."))
# For Pandas 2.0, errors was removed as an argument.
if PD_VERSION < (2, 0):
if "errors" in kwargs and kwargs['errors'] == "ignore":
# We need all data in order to ignore errors and propagate the original
# data.
requires = partitionings.Singleton(
reason=(
f"where(errors={kwargs['errors']!r}) is currently not "
"parallelizable, because all data must be collected on one "
"node to determine if the original data should be propagated "
"instead."))

actual_args['errors'] = errors
actual_args['errors'] = kwargs['errors'] if 'errors' in kwargs else None

def where_execution(df, *args):
runtime_values = {
Expand Down Expand Up @@ -1336,12 +1341,14 @@ def keys(self):
frame_base.wont_implement_method(
pd.Series, 'shape', reason="non-deferred-result"))

@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.with_docs_from(pd.Series, removed_method=PD_VERSION >= (2, 0))
@frame_base.args_to_kwargs(pd.Series, removed_method=PD_VERSION >= (2, 0))
@frame_base.populate_defaults(pd.Series, removed_method=PD_VERSION >= (2, 0))
def append(self, to_append, ignore_index, verify_integrity, **kwargs):
"""``ignore_index=True`` is not supported, because it requires generating an
order-sensitive index."""
if PD_VERSION >= (2, 0):
raise frame_base.WontImplementError('append() was removed in Pandas 2.0.')
if not isinstance(to_append, DeferredSeries):
raise frame_base.WontImplementError(
"append() only accepts DeferredSeries instances, received " +
Expand Down Expand Up @@ -1603,14 +1610,11 @@ def mean(self, skipna, **kwargs):
return self.sum(skipna=skipna, **kwargs) / size

@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.args_to_kwargs(
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
@frame_base.populate_defaults(
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
def var(self, axis, skipna, level, ddof, **kwargs):
"""Per-level aggregation is not yet supported
(https://github.com/apache/beam/issues/21829). Only the default,
``level=None``, is allowed."""
if level is not None:
raise NotImplementedError("per-level aggregation")
if skipna is None or skipna:
self = self.dropna() # pylint: disable=self-cls-assignment

Expand Down Expand Up @@ -1678,11 +1682,11 @@ def corr(self, other, method, min_periods):
requires_partition_by=partitionings.Singleton(reason=reason)))

@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.args_to_kwargs(
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
@frame_base.populate_defaults(
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
def skew(self, axis, skipna, level, numeric_only, **kwargs):
if level is not None:
raise NotImplementedError("per-level aggregation")
if skipna is None or skipna:
self = self.dropna() # pylint: disable=self-cls-assignment
# See the online, numerically stable formulae at
Expand Down Expand Up @@ -1742,11 +1746,11 @@ def combine_moments(data):
requires_partition_by=partitionings.Singleton()))

@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.args_to_kwargs(
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
@frame_base.populate_defaults(
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
def kurtosis(self, axis, skipna, level, numeric_only, **kwargs):
if level is not None:
raise NotImplementedError("per-level aggregation")
if skipna is None or skipna:
self = self.dropna() # pylint: disable=self-cls-assignment

Expand Down Expand Up @@ -2576,7 +2580,8 @@ def align(self, other, join, axis, copy, level, method, **kwargs):
if kwargs:
raise NotImplementedError('align(%s)' % ', '.join(kwargs.keys()))

if level is not None:
# In Pandas 2.0, all aggregations lost the level keyword.
if PD_VERSION < (2, 0) and level is not None:
# Could probably get by partitioning on the used levels.
requires_partition_by = partitionings.Singleton(reason=(
f"align(level={level}) is not currently parallelizable. Only "
Expand All @@ -2593,12 +2598,15 @@ def align(self, other, join, axis, copy, level, method, **kwargs):
requires_partition_by=requires_partition_by,
preserves_partition_by=partitionings.Arbitrary()))

@frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.with_docs_from(pd.DataFrame, removed_method=PD_VERSION >= (2, 0))
@frame_base.args_to_kwargs(pd.DataFrame, removed_method=PD_VERSION >= (2, 0))
@frame_base.populate_defaults(pd.DataFrame,
removed_method=PD_VERSION >= (2, 0))
def append(self, other, ignore_index, verify_integrity, sort, **kwargs):
"""``ignore_index=True`` is not supported, because it requires generating an
order-sensitive index."""
if PD_VERSION >= (2, 0):
raise frame_base.WontImplementError('append() was removed in Pandas 2.0.')
if not isinstance(other, DeferredDataFrame):
raise frame_base.WontImplementError(
"append() only accepts DeferredDataFrame instances, received " +
Expand Down Expand Up @@ -5388,6 +5396,7 @@ def func(df, *args, **kwargs):
name,
frame_base._elementwise_method(name, restrictions={'level': None},
base=pd.Series))

if hasattr(pd.DataFrame, name):
setattr(
DeferredDataFrame,
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ def test_loc(self):
self._run_test(lambda df: df.C.loc[df.A > 10], df)
self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates))

@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_sort(self):
# yapf: disable
df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']},
Expand Down Expand Up @@ -985,6 +986,7 @@ def test_series_fillna_series_as_value(self):

self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)

@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_verify_integrity(self):
df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
Expand Down Expand Up @@ -2042,6 +2044,7 @@ def test_dataframe_agg_modes(self):
self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)

@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_series_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.count(level=0),
Expand All @@ -2065,6 +2068,7 @@ def test_series_agg_level(self):
lambda df: df.set_index(['group', 'foo']).bar.median(level=1),
GROUPBY_DF)

@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_dataframe_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)
Expand Down Expand Up @@ -2232,6 +2236,7 @@ def test_df_agg_method_invalid_kwarg_raises(self):
self._run_error_test(
lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)

@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_agg_min_count(self):
df = pd.DataFrame({
'good': [1, 2, 3, np.nan],
Expand Down
Loading