From b004bc8db9c6b4b64d57559b403c47ad3b49d1cc Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Mon, 30 Sep 2024 10:05:07 +0700 Subject: [PATCH 1/9] fix bug all arg add as inputs --- sdks/python/apache_beam/dataframe/transforms.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 852b49c4e2ed..d05f6f28366f 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -395,7 +395,8 @@ def expr_to_stages(expr): if stage is None: # No stage available, compute this expression as part of a new stage. - stage = Stage(expr.args(), expr.requires_partition_by()) + stage = Stage([arg for arg in expr.args() if arg in inputs], + expr.requires_partition_by()) for arg in expr.args(): # For each argument, declare that it is also available in # this new stage. From 2a5736c8b4af8ffcac6336a79f759f73da67dad1 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Mon, 30 Sep 2024 11:26:47 +0700 Subject: [PATCH 2/9] fix bug for fillna --- sdks/python/apache_beam/dataframe/frames.py | 14 ++++++++++++-- sdks/python/apache_beam/dataframe/transforms.py | 3 +-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 421430ec972c..6dea75eb0e90 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -316,9 +316,19 @@ def __init__(self, value): else: # Default case, pass value through as a constant, no particular # partitioning requirement - value_expr = expressions.ConstantExpression(value) - get_value = lambda x: x + def func_elementwise(df): + df = df.copy() + df.fillna(value=value, **kwargs) + return df + requires = partitionings.Arbitrary() + return frame_base.DeferredFrame.wrap( + # yapf: disable + expressions.ComputedExpression( + 'fillna', + func_elementwise, [self._expr], + preserves_partition_by=partitionings.Arbitrary(), + requires_partition_by=requires)) return frame_base.DeferredFrame.wrap( # yapf: disable diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index d05f6f28366f..852b49c4e2ed 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -395,8 +395,7 @@ def expr_to_stages(expr): if stage is None: # No stage available, compute this expression as part of a new stage. - stage = Stage([arg for arg in expr.args() if arg in inputs], - expr.requires_partition_by()) + stage = Stage(expr.args(), expr.requires_partition_by()) for arg in expr.args(): # For each argument, declare that it is also available in # this new stage. From d2a8470161fb9340263a876cd7a5b895dec8b2b6 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Mon, 30 Sep 2024 12:17:48 +0700 Subject: [PATCH 3/9] Revert "fix bug for fillna" This reverts commit 2a5736c8b4af8ffcac6336a79f759f73da67dad1. --- sdks/python/apache_beam/dataframe/frames.py | 14 ++------------ sdks/python/apache_beam/dataframe/transforms.py | 3 ++- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 6dea75eb0e90..421430ec972c 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -316,19 +316,9 @@ def __init__(self, value): else: # Default case, pass value through as a constant, no particular # partitioning requirement - def func_elementwise(df): - df = df.copy() - df.fillna(value=value, **kwargs) - return df - + value_expr = expressions.ConstantExpression(value) + get_value = lambda x: x requires = partitionings.Arbitrary() - return frame_base.DeferredFrame.wrap( - # yapf: disable - expressions.ComputedExpression( - 'fillna', - func_elementwise, [self._expr], - preserves_partition_by=partitionings.Arbitrary(), - requires_partition_by=requires)) return frame_base.DeferredFrame.wrap( # yapf: disable diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 852b49c4e2ed..d05f6f28366f 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -395,7 +395,8 @@ def expr_to_stages(expr): if stage is None: # No stage available, compute this expression as part of a new stage. - stage = Stage(expr.args(), expr.requires_partition_by()) + stage = Stage([arg for arg in expr.args() if arg in inputs], + expr.requires_partition_by()) for arg in expr.args(): # For each argument, declare that it is also available in # this new stage. From 0cb0c06d8b92b500176a7dec85a7afbcf19f3f42 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Mon, 30 Sep 2024 12:22:47 +0700 Subject: [PATCH 4/9] fix bug for fillna --- sdks/python/apache_beam/dataframe/transforms.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index d05f6f28366f..d0b5be4eb2a9 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -395,7 +395,10 @@ def expr_to_stages(expr): if stage is None: # No stage available, compute this expression as part of a new stage. - stage = Stage([arg for arg in expr.args() if arg in inputs], + stage = Stage([ + arg for arg in expr.args() + if not isinstance(arg, expressions.ConstantExpression) + ], expr.requires_partition_by()) for arg in expr.args(): # For each argument, declare that it is also available in From 62bfbfb56e6128e5f5ccdae2b95cc4353b4f1ede Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Mon, 30 Sep 2024 23:11:11 +0700 Subject: [PATCH 5/9] add test for fillna a column --- sdks/python/apache_beam/dataframe/frames_test.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 076ab504adde..b49abd1eb24a 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -1009,6 +1009,13 @@ def test_dataframe_fillna_dataframe_as_value(self): self._run_test(lambda df, df2: df.fillna(df2), df, df2) + def test_dataframe_fillna_dataframe_column_as_value(self): + df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], + [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]], + columns=list("ABCD")) + + self._run_test(lambda df, s: df['A'].fillna(s), df, s) + def test_dataframe_fillna_series_as_value(self): df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]], From e7e86e532b76855cc060b6a6603e658ec46ec276 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Mon, 30 Sep 2024 23:32:31 +0700 Subject: [PATCH 6/9] add test for fillna a column --- sdks/python/apache_beam/dataframe/frames_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index b49abd1eb24a..95c33fac05ed 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -1009,12 +1009,12 @@ def test_dataframe_fillna_dataframe_as_value(self): self._run_test(lambda df, df2: df.fillna(df2), df, df2) - def test_dataframe_fillna_dataframe_column_as_value(self): + def test_dataframe_column_fillna_constant_as_value(self): df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]], columns=list("ABCD")) - self._run_test(lambda df, s: df['A'].fillna(s), df, s) + self._run_test(lambda df: df['A'].fillna(0), df) def test_dataframe_fillna_series_as_value(self): df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], From 8f375e4abf4bafb2508e9b40a194edb40c1675a4 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Tue, 1 Oct 2024 06:35:40 +0700 Subject: [PATCH 7/9] add test for fillna a column --- sdks/python/apache_beam/dataframe/transforms_test.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index a143606cc913..c2a5ca12e163 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -354,6 +354,16 @@ def test_rename(self): 0: 2, 2: 0 }, errors='raise')) + def test_dataframe_column_fillna_constant_as_value(self): + df = pd.DataFrame({'A': (1, "NAN", 1), 'B': (1, 1, 1)}) + + def column_fillna_constant_as_value_function(df): + df['A'] = df['A'].fillna(0) + return df + + self.run_scenario( + df, lambda df: column_fillna_constant_as_value_function(df)) + class FusionTest(unittest.TestCase): @staticmethod From 86bc40146075361747bef64499320366e1cc15d7 Mon Sep 17 00:00:00 2001 From: DKPHUONG Date: Tue, 1 Oct 2024 06:37:02 +0700 Subject: [PATCH 8/9] revert add test to frames_test --- sdks/python/apache_beam/dataframe/frames_test.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 95c33fac05ed..076ab504adde 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -1009,13 +1009,6 @@ def test_dataframe_fillna_dataframe_as_value(self): self._run_test(lambda df, df2: df.fillna(df2), df, df2) - def test_dataframe_column_fillna_constant_as_value(self): - df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], - [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]], - columns=list("ABCD")) - - self._run_test(lambda df: df['A'].fillna(0), df) - def test_dataframe_fillna_series_as_value(self): df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]], From 568f1ce64c09a0c2183c1458434bde7e471ed8a5 Mon Sep 17 00:00:00 2001 From: DKER2 Date: Wed, 9 Oct 2024 11:18:32 +0700 Subject: [PATCH 9/9] Move test from transforms to frames --- sdks/python/apache_beam/dataframe/frames_test.py | 11 +++++++++++ sdks/python/apache_beam/dataframe/transforms_test.py | 10 ---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 076ab504adde..55d9fc5f4dfb 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -1025,6 +1025,17 @@ def test_series_fillna_series_as_value(self): self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2) + def test_dataframe_column_fillna_constant_as_value(self): + from apache_beam.dataframe import convert + from apache_beam.testing.util import assert_that + from apache_beam.testing.util import equal_to + with beam.Pipeline(None) as p: + pcoll = ( + p | beam.Create([1.0, np.nan, -1.0]) | beam.Select(x=lambda x: x)) + df = convert.to_dataframe(pcoll) + df_new = df['x'].fillna(0) + assert_that(convert.to_pcollection(df_new), equal_to([1.0, 0.0, -1.0])) + @unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0') def test_append_verify_integrity(self): df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10)) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index c2a5ca12e163..a143606cc913 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -354,16 +354,6 @@ def test_rename(self): 0: 2, 2: 0 }, errors='raise')) - def test_dataframe_column_fillna_constant_as_value(self): - df = pd.DataFrame({'A': (1, "NAN", 1), 'B': (1, 1, 1)}) - - def column_fillna_constant_as_value_function(df): - df['A'] = df['A'].fillna(0) - return df - - self.run_scenario( - df, lambda df: column_fillna_constant_as_value_function(df)) - class FusionTest(unittest.TestCase): @staticmethod