Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove append method for Pandas >= 2.0. #28396

Merged
merged 14 commits into from
Sep 13, 2023
64 changes: 57 additions & 7 deletions sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def wrapper(self, inplace=False, **kwargs):
return wrapper


def args_to_kwargs(base_type):
def args_to_kwargs(base_type, removed_method=False, removed_args=None):
"""Convert all args to kwargs before calling the decorated function.

When applied to a function, this decorator creates a new function
Expand All @@ -484,18 +484,47 @@ def args_to_kwargs(base_type):
determine the name to use for arguments that are converted to keyword
arguments.

For internal use only. No backwards compatibility guarantees."""
For internal use only. No backwards compatibility guarantees.

Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
arg_names = getfullargspec(unwrap(getattr(base_type, func.__name__))).args
if removed_method:
# Do no processing, let Beam function itself raise the error if called.
caneff marked this conversation as resolved.
Show resolved Hide resolved
return func

removed_arg_names = removed_args if removed_args is not None else []

base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
base_arg_names = base_arg_spec.args
# Some arguments are keyword only and we still want to check against those.
all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs
beam_arg_names = getfullargspec(func).args

if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) -
set(removed_arg_names)):
raise TypeError(
f"Beam definition of {func.__name__} has arguments that are not found"
f" in the base version of the function: {not_found}")

@functools.wraps(func)
def wrapper(*args, **kwargs):
for name, value in zip(arg_names, args):
for name, value in zip(base_arg_names, args):
if name in kwargs:
raise TypeError(
"%s() got multiple values for argument '%s'" %
(func.__name__, name))
kwargs[name] = value
# Still have to populate these for the Beam function signature.
if removed_args:
for name in removed_args:
if not name in kwargs:
kwargs[name] = None
return func(**kwargs)

return wrapper
Expand Down Expand Up @@ -524,14 +553,22 @@ def wrapper(*args, **kwargs):
f"**{BEAM_SPECIFIC!r}** for details.")


def with_docs_from(base_type, name=None):
def with_docs_from(base_type, name=None, removed_method=False):
"""Decorator that updates the documentation from the wrapped function to
duplicate the documentation from the identically-named method in `base_type`.

Any docstring on the original function will be included in the new function
under a "Differences from pandas" heading.

removed_method used in cases where a method has been removed in a later
version of Pandas.
"""
def wrap(func):
if removed_method:
func.__doc__ = (
"This method has been removed in the current version of Pandas.")
return func

fn_name = name or func.__name__
orig_doc = getattr(base_type, fn_name).__doc__
if orig_doc is None:
Expand Down Expand Up @@ -588,15 +625,26 @@ def format_section(header):
return wrap


def populate_defaults(base_type):
def populate_defaults(base_type, removed_method=False, removed_args=None):
"""Populate default values for keyword arguments in decorated function.

When applied to a function, this decorator creates a new function
with default values for all keyword arguments, based on the default values
for the identically-named method on `base_type`.

For internal use only. No backwards compatibility guarantees."""
For internal use only. No backwards compatibility guarantees.

Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
if removed_method:
return func

base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
if not base_argspec.defaults:
return func
Expand All @@ -613,6 +661,8 @@ def wrap(func):
defaults_to_populate = set(
func_argspec.args[:num_non_defaults]).intersection(
arg_to_default.keys())
if removed_args:
defaults_to_populate -= set(removed_args)

@functools.wraps(func)
def wrapper(**kwargs):
Expand Down
23 changes: 22 additions & 1 deletion sdks/python/apache_beam/dataframe/frame_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,42 @@ class Base(object):
def func(self, a=1, b=2, c=3):
pass

def func_removed_args(self, a):
pass

class Proxy(object):
@frame_base.args_to_kwargs(Base)
@frame_base.populate_defaults(Base)
def func(self, a, c=1000, **kwargs):
return dict(kwargs, a=a, c=c)

@frame_base.args_to_kwargs(Base, removed_method=True)
@frame_base.populate_defaults(Base, removed_method=True)
def func_removed_method(self, a, **kwargs):
return dict(kwargs, a=a)

@frame_base.args_to_kwargs(Base, removed_args=['c'])
@frame_base.populate_defaults(Base, removed_args=['c'])
def func_removed_args(self, a, c, **kwargs):
return dict(kwargs, a=a)

proxy = Proxy()
# pylint: disable=too-many-function-args
# pylint: disable=too-many-function-args,no-value-for-parameter
self.assertEqual(proxy.func(), {'a': 1, 'c': 1000})
self.assertEqual(proxy.func(100), {'a': 100, 'c': 1000})
self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6})
self.assertEqual(proxy.func(2, c=6), {'a': 2, 'c': 6})
self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6})
self.assertEqual(proxy.func(c=6), {'a': 1, 'c': 6})

with self.assertRaises(TypeError): # missing 1 required positional argument
proxy.func_removed_method()
self.assertEqual(proxy.func_removed_method(12, c=100), {'a': 12, 'c': 100})

with self.assertRaises(TypeError): # missing 1 required positional argument
proxy.func_removed_args()
self.assertEqual(proxy.func_removed_args(12, d=100), {'a': 12, 'd': 100})


if __name__ == '__main__':
unittest.main()
17 changes: 11 additions & 6 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -1336,12 +1336,14 @@ def keys(self):
frame_base.wont_implement_method(
pd.Series, 'shape', reason="non-deferred-result"))

@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.with_docs_from(pd.Series, removed_method=PD_VERSION >= (2, 0))
@frame_base.args_to_kwargs(pd.Series, removed_method=PD_VERSION >= (2, 0))
@frame_base.populate_defaults(pd.Series, removed_method=PD_VERSION >= (2, 0))
def append(self, to_append, ignore_index, verify_integrity, **kwargs):
"""``ignore_index=True`` is not supported, because it requires generating an
order-sensitive index."""
if PD_VERSION >= (2, 0):
raise frame_base.WontImplementError('append() was removed in Pandas 2.0.')
if not isinstance(to_append, DeferredSeries):
raise frame_base.WontImplementError(
"append() only accepts DeferredSeries instances, received " +
Expand Down Expand Up @@ -2593,12 +2595,15 @@ def align(self, other, join, axis, copy, level, method, **kwargs):
requires_partition_by=requires_partition_by,
preserves_partition_by=partitionings.Arbitrary()))

@frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.with_docs_from(pd.DataFrame, removed_method=PD_VERSION >= (2, 0))
@frame_base.args_to_kwargs(pd.DataFrame, removed_method=PD_VERSION >= (2, 0))
@frame_base.populate_defaults(pd.DataFrame,
removed_method=PD_VERSION >= (2, 0))
def append(self, other, ignore_index, verify_integrity, sort, **kwargs):
"""``ignore_index=True`` is not supported, because it requires generating an
order-sensitive index."""
if PD_VERSION >= (2, 0):
raise frame_base.WontImplementError('append() was removed in Pandas 2.0.')
if not isinstance(other, DeferredDataFrame):
raise frame_base.WontImplementError(
"append() only accepts DeferredDataFrame instances, received " +
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ def test_loc(self):
self._run_test(lambda df: df.C.loc[df.A > 10], df)
self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates))

@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_sort(self):
# yapf: disable
df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']},
Expand Down Expand Up @@ -985,6 +986,7 @@ def test_series_fillna_series_as_value(self):

self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)

@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_verify_integrity(self):
df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
Expand Down
Loading