diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 24497f1de069..1da12ececff6 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -475,7 +475,7 @@ def wrapper(self, inplace=False, **kwargs): return wrapper -def args_to_kwargs(base_type): +def args_to_kwargs(base_type, removed_method=False, removed_args=None): """Convert all args to kwargs before calling the decorated function. When applied to a function, this decorator creates a new function @@ -484,18 +484,47 @@ def args_to_kwargs(base_type): determine the name to use for arguments that are converted to keyword arguments. - For internal use only. No backwards compatibility guarantees.""" + For internal use only. No backwards compatibility guarantees. + + Args: + base_type: The pandas type of the method that this is trying to replicate. + removed_method: Whether this method has been removed in the running + Pandas version. + removed_args: If not empty, which arguments have been dropped in the + running Pandas version. + """ def wrap(func): - arg_names = getfullargspec(unwrap(getattr(base_type, func.__name__))).args + if removed_method: + # Do no processing, let Beam function itself raise the error if called. + return func + + removed_arg_names = removed_args if removed_args is not None else [] + + base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__))) + base_arg_names = base_arg_spec.args + # Some arguments are keyword only and we still want to check against those. + all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs + beam_arg_names = getfullargspec(func).args + + if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) - + set(removed_arg_names)): + raise TypeError( + f"Beam definition of {func.__name__} has arguments that are not found" + f" in the base version of the function: {not_found}") @functools.wraps(func) def wrapper(*args, **kwargs): - for name, value in zip(arg_names, args): + for name, value in zip(base_arg_names, args): if name in kwargs: raise TypeError( "%s() got multiple values for argument '%s'" % (func.__name__, name)) kwargs[name] = value + # Still have to populate these for the Beam function signature. + if removed_args: + for name in removed_args: + if not name in kwargs: + kwargs[name] = None return func(**kwargs) return wrapper @@ -524,14 +553,22 @@ def wrapper(*args, **kwargs): f"**{BEAM_SPECIFIC!r}** for details.") -def with_docs_from(base_type, name=None): +def with_docs_from(base_type, name=None, removed_method=False): """Decorator that updates the documentation from the wrapped function to duplicate the documentation from the identically-named method in `base_type`. Any docstring on the original function will be included in the new function under a "Differences from pandas" heading. + + removed_method used in cases where a method has been removed in a later + version of Pandas. """ def wrap(func): + if removed_method: + func.__doc__ = ( + "This method has been removed in the current version of Pandas.") + return func + fn_name = name or func.__name__ orig_doc = getattr(base_type, fn_name).__doc__ if orig_doc is None: @@ -588,15 +625,26 @@ def format_section(header): return wrap -def populate_defaults(base_type): +def populate_defaults(base_type, removed_method=False, removed_args=None): """Populate default values for keyword arguments in decorated function. When applied to a function, this decorator creates a new function with default values for all keyword arguments, based on the default values for the identically-named method on `base_type`. - For internal use only. No backwards compatibility guarantees.""" + For internal use only. No backwards compatibility guarantees. + + Args: + base_type: The pandas type of the method that this is trying to replicate. + removed_method: Whether this method has been removed in the running + Pandas version. + removed_args: If not empty, which arguments have been dropped in the + running Pandas version. + """ def wrap(func): + if removed_method: + return func + base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__))) if not base_argspec.defaults: return func @@ -613,6 +661,8 @@ def wrap(func): defaults_to_populate = set( func_argspec.args[:num_non_defaults]).intersection( arg_to_default.keys()) + if removed_args: + defaults_to_populate -= set(removed_args) @functools.wraps(func) def wrapper(**kwargs): diff --git a/sdks/python/apache_beam/dataframe/frame_base_test.py b/sdks/python/apache_beam/dataframe/frame_base_test.py index 82d5b65e1a49..2d16d02ba1ea 100644 --- a/sdks/python/apache_beam/dataframe/frame_base_test.py +++ b/sdks/python/apache_beam/dataframe/frame_base_test.py @@ -93,14 +93,27 @@ class Base(object): def func(self, a=1, b=2, c=3): pass + def func_removed_args(self, a): + pass + class Proxy(object): @frame_base.args_to_kwargs(Base) @frame_base.populate_defaults(Base) def func(self, a, c=1000, **kwargs): return dict(kwargs, a=a, c=c) + @frame_base.args_to_kwargs(Base, removed_method=True) + @frame_base.populate_defaults(Base, removed_method=True) + def func_removed_method(self, a, **kwargs): + return dict(kwargs, a=a) + + @frame_base.args_to_kwargs(Base, removed_args=['c']) + @frame_base.populate_defaults(Base, removed_args=['c']) + def func_removed_args(self, a, c, **kwargs): + return dict(kwargs, a=a) + proxy = Proxy() - # pylint: disable=too-many-function-args + # pylint: disable=too-many-function-args,no-value-for-parameter self.assertEqual(proxy.func(), {'a': 1, 'c': 1000}) self.assertEqual(proxy.func(100), {'a': 100, 'c': 1000}) self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6}) @@ -108,6 +121,14 @@ def func(self, a, c=1000, **kwargs): self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6}) self.assertEqual(proxy.func(c=6), {'a': 1, 'c': 6}) + with self.assertRaises(TypeError): # missing 1 required positional argument + proxy.func_removed_method() + self.assertEqual(proxy.func_removed_method(12, c=100), {'a': 12, 'c': 100}) + + with self.assertRaises(TypeError): # missing 1 required positional argument + proxy.func_removed_args() + self.assertEqual(proxy.func_removed_args(12, d=100), {'a': 12, 'd': 100}) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 0d9b22ae3f9e..6c82b3b258cd 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -1336,12 +1336,14 @@ def keys(self): frame_base.wont_implement_method( pd.Series, 'shape', reason="non-deferred-result")) - @frame_base.with_docs_from(pd.Series) - @frame_base.args_to_kwargs(pd.Series) - @frame_base.populate_defaults(pd.Series) + @frame_base.with_docs_from(pd.Series, removed_method=PD_VERSION >= (2, 0)) + @frame_base.args_to_kwargs(pd.Series, removed_method=PD_VERSION >= (2, 0)) + @frame_base.populate_defaults(pd.Series, removed_method=PD_VERSION >= (2, 0)) def append(self, to_append, ignore_index, verify_integrity, **kwargs): """``ignore_index=True`` is not supported, because it requires generating an order-sensitive index.""" + if PD_VERSION >= (2, 0): + raise frame_base.WontImplementError('append() was removed in Pandas 2.0.') if not isinstance(to_append, DeferredSeries): raise frame_base.WontImplementError( "append() only accepts DeferredSeries instances, received " + @@ -2593,12 +2595,15 @@ def align(self, other, join, axis, copy, level, method, **kwargs): requires_partition_by=requires_partition_by, preserves_partition_by=partitionings.Arbitrary())) - @frame_base.with_docs_from(pd.DataFrame) - @frame_base.args_to_kwargs(pd.DataFrame) - @frame_base.populate_defaults(pd.DataFrame) + @frame_base.with_docs_from(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) + @frame_base.args_to_kwargs(pd.DataFrame, removed_method=PD_VERSION >= (2, 0)) + @frame_base.populate_defaults(pd.DataFrame, + removed_method=PD_VERSION >= (2, 0)) def append(self, other, ignore_index, verify_integrity, sort, **kwargs): """``ignore_index=True`` is not supported, because it requires generating an order-sensitive index.""" + if PD_VERSION >= (2, 0): + raise frame_base.WontImplementError('append() was removed in Pandas 2.0.') if not isinstance(other, DeferredDataFrame): raise frame_base.WontImplementError( "append() only accepts DeferredDataFrame instances, received " + diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index fa121aa85c30..69e7f4b19d6f 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -797,6 +797,7 @@ def test_loc(self): self._run_test(lambda df: df.C.loc[df.A > 10], df) self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates)) + @unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0') def test_append_sort(self): # yapf: disable df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']}, @@ -985,6 +986,7 @@ def test_series_fillna_series_as_value(self): self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2) + @unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0') def test_append_verify_integrity(self): df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10)) df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))