Skip to content

Commit

Permalink
Merge branch 'remove_append' into errors_level3
Browse files Browse the repository at this point in the history
  • Loading branch information
caneff committed Sep 12, 2023
2 parents a8cf5d1 + ee7a858 commit fcbbcbc
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 14 deletions.
58 changes: 51 additions & 7 deletions sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def wrapper(self, inplace=False, **kwargs):
return wrapper


def args_to_kwargs(base_type):
def args_to_kwargs(base_type, removed_method=False, removed_args=None):
"""Convert all args to kwargs before calling the decorated function.
When applied to a function, this decorator creates a new function
Expand All @@ -484,18 +484,44 @@ def args_to_kwargs(base_type):
determine the name to use for arguments that are converted to keyword
arguments.
For internal use only. No backwards compatibility guarantees."""
removed_method used in cases where a method has been removed in a later
version of Pandas. removed_args used in cases where a method has had
arguments removed in a later version of Pandas.
For internal use only. No backwards compatibility guarantees.
"""
def wrap(func):
arg_names = getfullargspec(unwrap(getattr(base_type, func.__name__))).args
if removed_method:
# Do no processing, let Beam function itself raise the error if called.
return func

removed_arg_names = removed_args if removed_args is not None else []

base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
base_arg_names = base_arg_spec.args
# Some arguments are keyword only and we still want to check against those.
all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs
beam_arg_names = getfullargspec(func).args

if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) -
set(removed_arg_names)):
raise TypeError(
f"Beam definition of {func.__name__} has arguments that are not found"
f" in the base version of the function: {not_found}")

@functools.wraps(func)
def wrapper(*args, **kwargs):
for name, value in zip(arg_names, args):
for name, value in zip(base_arg_names, args):
if name in kwargs:
raise TypeError(
"%s() got multiple values for argument '%s'" %
(func.__name__, name))
kwargs[name] = value
# Still have to populate these for the Beam function signature.
if removed_args:
for name in removed_args:
if not name in kwargs:
kwargs[name] = None
return func(**kwargs)

return wrapper
Expand Down Expand Up @@ -524,14 +550,22 @@ def wrapper(*args, **kwargs):
f"**{BEAM_SPECIFIC!r}** for details.")


def with_docs_from(base_type, name=None):
def with_docs_from(base_type: object, name=None, removed_method=False):
"""Decorator that updates the documentation from the wrapped function to
duplicate the documentation from the identically-named method in `base_type`.
Any docstring on the original function will be included in the new function
under a "Differences from pandas" heading.
removed_method used in cases where a method has been removed in a later
version of Pandas.
"""
def wrap(func):
if removed_method:
func.__doc__ = (
"This method has been removed in the current version of Pandas.")
return func

fn_name = name or func.__name__
orig_doc = getattr(base_type, fn_name).__doc__
if orig_doc is None:
Expand Down Expand Up @@ -588,15 +622,23 @@ def format_section(header):
return wrap


def populate_defaults(base_type):
def populate_defaults(base_type, removed_method=False, removed_args=None):
"""Populate default values for keyword arguments in decorated function.
When applied to a function, this decorator creates a new function
with default values for all keyword arguments, based on the default values
for the identically-named method on `base_type`.
For internal use only. No backwards compatibility guarantees."""
removed_method used in cases where a method has been removed in a later
version of Pandas. removed_args used in cases where a method has had
arguments removed in a later version of Pandas.
For internal use only. No backwards compatibility guarantees.
"""
def wrap(func):
if removed_method:
return func

base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
if not base_argspec.defaults:
return func
Expand All @@ -613,6 +655,8 @@ def wrap(func):
defaults_to_populate = set(
func_argspec.args[:num_non_defaults]).intersection(
arg_to_default.keys())
if removed_args:
defaults_to_populate -= set(removed_args)

@functools.wraps(func)
def wrapper(**kwargs):
Expand Down
23 changes: 22 additions & 1 deletion sdks/python/apache_beam/dataframe/frame_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,42 @@ class Base(object):
def func(self, a=1, b=2, c=3):
pass

def func_removed_args(self, a):
pass

class Proxy(object):
@frame_base.args_to_kwargs(Base)
@frame_base.populate_defaults(Base)
def func(self, a, c=1000, **kwargs):
return dict(kwargs, a=a, c=c)

@frame_base.args_to_kwargs(Base, removed_method=True)
@frame_base.populate_defaults(Base, removed_method=True)
def func_removed_method(self, a, **kwargs):
return dict(kwargs, a=a)

@frame_base.args_to_kwargs(Base, removed_args=['c'])
@frame_base.populate_defaults(Base, removed_args=['c'])
def func_removed_args(self, a, c, **kwargs):
return dict(kwargs, a=a)

proxy = Proxy()
# pylint: disable=too-many-function-args
# pylint: disable=too-many-function-args,no-value-for-parameter
self.assertEqual(proxy.func(), {'a': 1, 'c': 1000})
self.assertEqual(proxy.func(100), {'a': 100, 'c': 1000})
self.assertEqual(proxy.func(2, 4, 6), {'a': 2, 'b': 4, 'c': 6})
self.assertEqual(proxy.func(2, c=6), {'a': 2, 'c': 6})
self.assertEqual(proxy.func(c=6, a=2), {'a': 2, 'c': 6})
self.assertEqual(proxy.func(c=6), {'a': 1, 'c': 6})

with self.assertRaises(TypeError): # missing 1 required positional argument
proxy.func_removed_method()
self.assertEqual(proxy.func_removed_method(12, c=100), {'a': 12, 'c': 100})

with self.assertRaises(TypeError): # missing 1 required positional argument
proxy.func_removed_args()
self.assertEqual(proxy.func_removed_args(12, d=100), {'a': 12, 'd': 100})


if __name__ == '__main__':
unittest.main()
17 changes: 11 additions & 6 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,12 +1339,14 @@ def keys(self):
frame_base.wont_implement_method(
pd.Series, 'shape', reason="non-deferred-result"))

@frame_base.with_docs_from(pd.Series)
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
@frame_base.with_docs_from(pd.Series, removed_method=PD_VERSION >= (2, 0))
@frame_base.args_to_kwargs(pd.Series, removed_method=PD_VERSION >= (2, 0))
@frame_base.populate_defaults(pd.Series, removed_method=PD_VERSION >= (2, 0))
def append(self, to_append, ignore_index, verify_integrity, **kwargs):
"""``ignore_index=True`` is not supported, because it requires generating an
order-sensitive index."""
if PD_VERSION >= (2, 0):
raise frame_base.WontImplementError('append() was removed in Pandas 2.0.')
if not isinstance(to_append, DeferredSeries):
raise frame_base.WontImplementError(
"append() only accepts DeferredSeries instances, received " +
Expand Down Expand Up @@ -2588,12 +2590,15 @@ def align(self, other, join, axis, copy, level, method, **kwargs):
requires_partition_by=requires_partition_by,
preserves_partition_by=partitionings.Arbitrary()))

@frame_base.with_docs_from(pd.DataFrame)
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.with_docs_from(pd.DataFrame, removed_method=PD_VERSION >= (2, 0))
@frame_base.args_to_kwargs(pd.DataFrame, removed_method=PD_VERSION >= (2, 0))
@frame_base.populate_defaults(pd.DataFrame,
removed_method=PD_VERSION >= (2, 0))
def append(self, other, ignore_index, verify_integrity, sort, **kwargs):
"""``ignore_index=True`` is not supported, because it requires generating an
order-sensitive index."""
if PD_VERSION >= (2, 0):
raise frame_base.WontImplementError('append() was removed in Pandas 2.0.')
if not isinstance(other, DeferredDataFrame):
raise frame_base.WontImplementError(
"append() only accepts DeferredDataFrame instances, received " +
Expand Down
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ def test_loc(self):
self._run_test(lambda df: df.C.loc[df.A > 10], df)
self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates))

@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_sort(self):
# yapf: disable
df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']},
Expand All @@ -812,6 +813,13 @@ def test_append_sort(self):
self._run_test(lambda df1, df2: df2.append(df1, sort=True), df1, df2)
self._run_test(lambda df1, df2: df2.append(df1, sort=False), df1, df2)

@unittest.skipIf(PD_VERSION < (2, 0), 'append removed in Pandas 2.0')
def test_append_raises_pandas_2(self):
df = pd.DataFrame({'A': [1, 1]})

with self.assertRaises(NotImplementedError):
df.append(df)

def test_smallest_largest(self):
df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]})
self._run_test(lambda df: df.nlargest(1, 'A', keep='all'), df)
Expand Down Expand Up @@ -985,6 +993,7 @@ def test_series_fillna_series_as_value(self):

self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)

@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_verify_integrity(self):
df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
Expand Down

0 comments on commit fcbbcbc

Please sign in to comment.