From 38ef421442c0f57d6cba83952c2f7a34650e1bae Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 11 Oct 2024 10:16:38 -0400 Subject: [PATCH 01/13] Update typecheck err msg --- .../apache_beam/transforms/ptransform.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 8554ebce5dbd..065b1416712c 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -946,14 +946,23 @@ def element_type(side_input): continue if not typehints.is_consistent_with(bindings.get(arg, typehints.Any), hint): + #raise TypeCheckError( + #'Type hint violation for \'{label}\': requires {hint} but got ' + #'{actual_type} for {arg}\nFull type hint:\n{debug_str}'.format( + #label=self.label, + #hint=hint, + #actual_type=bindings[arg], + #arg=arg, + #debug_str=type_hints.debug_str())) + transform_nest_level = self.label.count("/") + split_producer_label = pvalueish.producer.full_label.split("/") + producer_label = "/".join( + split_producer_label[:transform_nest_level + 1]) raise TypeCheckError( - 'Type hint violation for \'{label}\': requires {hint} but got ' - '{actual_type} for {arg}\nFull type hint:\n{debug_str}'.format( - label=self.label, - hint=hint, - actual_type=bindings[arg], - arg=arg, - debug_str=type_hints.debug_str())) + f"The transform '{self.label}' only accepts PCollections of type '{hint}' " + f"but was applied to a PCollection of type '{bindings[arg]}' (produced by the transform '{producer_label}'). " + "Please ensure the input PCollection contains elements of the correct type." + ) def _process_argspec_fn(self): """Returns an argspec of the function actually consuming the data. From 997327012cf707ce589fddc3c347340e70073717 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 25 Oct 2024 14:12:25 -0400 Subject: [PATCH 02/13] update a few typed_pipeline_test unit tests --- .../typehints/typed_pipeline_test.py | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 9cb3fcdbb91d..48139cd7d42e 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -88,11 +88,11 @@ def process(self, element): self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_method(self): @@ -104,11 +104,11 @@ def process(self, element: int) -> typehints.Tuple[str]: self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_method_with_class_decorators(self): @@ -124,12 +124,12 @@ def process(self, element: int) -> typehints.Tuple[str]: with self.assertRaisesRegex( typehints.TypeCheckError, - r'requires.*Tuple\[, \].*got.*str'): + r'accepts.*Tuple\[, \].*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex( typehints.TypeCheckError, - r'requires.*Tuple\[, \].*got.*int'): + r'accepts.*Tuple\[, \].*applied.*int'): _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_callable_iterable_output(self): @@ -156,11 +156,11 @@ def process(self, element: typehints.Tuple[int, int]) -> \ self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): _ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn)) def test_typed_callable_instance(self): @@ -177,11 +177,11 @@ def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]: self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): _ = ['a', 'b', 'c'] | pardo with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'accepts.*int.*applied.*str'): _ = [1, 2, 3] | (pardo | 'again' >> pardo) def test_filter_type_hint(self): @@ -379,7 +379,7 @@ def process(self, element: int, *, side_input: str) -> \ self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*str.*got.*int.*side_input'): + r'accepts.*str.*applied.*int.*side_input'): _ = [1, 2, 3] | beam.ParDo(my_do_fn, side_input=1) def test_typed_dofn_var_kwargs(self): @@ -394,7 +394,7 @@ def process(self, element: int, **side_inputs: typehints.Dict[str, str]) \ self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*str.*got.*int.*side_inputs'): + r'accepts.*str.*applied.*int.*side_inputs'): _ = [1, 2, 3] | beam.ParDo(my_do_fn, a=1) def test_typed_callable_string_literals(self): @@ -415,7 +415,7 @@ def fn(element: int): return pcoll | beam.ParDo(fn) self.assertListEqual([1, 2, 3], [1, 2, 3] | MyMap()) - with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*got.*str'): + with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*applied.*str'): _ = ['a'] | MyMap() def test_typed_ptransform_fn_conflicting_hints(self): @@ -430,10 +430,10 @@ def fn(element: float): return pcoll | beam.ParDo(fn) with self.assertRaisesRegex(typehints.TypeCheckError, - r'ParDo.*requires.*float.*got.*int'): + r'ParDo.*accepts.*float.*applied.*int'): _ = [1, 2, 3] | MyMap() with self.assertRaisesRegex(typehints.TypeCheckError, - r'MyMap.*expected.*int.*got.*str'): + r'MyMap.*expected.*int.*applied.*str'): _ = ['a'] | MyMap() def test_typed_dofn_string_literals(self): @@ -632,14 +632,14 @@ def produces_unkown(e): return e @typehints.with_input_types(int) - def requires_int(e): + def accepts_int(e): return e class MyPTransform(beam.PTransform): def expand(self, pcoll): unknowns = pcoll | beam.Map(produces_unkown) ints = pcoll | beam.Map(int) - return (unknowns, ints) | beam.Flatten() | beam.Map(requires_int) + return (unknowns, ints) | beam.Flatten() | beam.Map(accepts_int) _ = [1, 2, 3] | MyPTransform() @@ -743,7 +743,7 @@ def repeat(s, *times): with self.assertRaisesRegex( typehints.TypeCheckError, - (r'requires Tuple\[, ...\] but got ' + (r'accepts Tuple\[, ...\] but got ' r'Tuple\[, ...\]')): ['a', 'bb', 'c'] | beam.Map(repeat, 'z') @@ -761,7 +761,7 @@ def test_var_positional_only_side_input_hint(self): with self.assertRaisesRegex( typehints.TypeCheckError, - r'requires Tuple\[Union\[, \], ...\] but ' + r'accepts Tuple\[Union\[, \], ...\] but ' r'got Tuple\[Union\[, \], ...\]'): _ = [1.2] | beam.Map(lambda *_: 'a', 5).with_input_types(int, str) @@ -782,7 +782,7 @@ def test_var_keyword_side_input_hint(self): with self.assertRaisesRegex( typehints.TypeCheckError, - r'requires Dict\[, \] but got ' + r'accepts Dict\[, \] but got ' r'Dict\[, \]'): _ = (['a', 'b', 'c'] | beam.Map(lambda e, **_: 'a', kw=5).with_input_types( From afd0014af5d837e0e38d3578c6740e9b3472e652 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Mon, 28 Oct 2024 22:25:08 -0400 Subject: [PATCH 03/13] Move new logic to only apply to main element --- .../apache_beam/transforms/ptransform.py | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 065b1416712c..8e5db5ad95a0 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -939,30 +939,37 @@ def element_type(side_input): bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types) hints = getcallargs_forhints( argspec_fn, *input_types[0], **input_types[1]) - for arg, hint in hints.items(): + arg_hints = iter(hints.items()) + element_arg, element_hint = next(arg_hints) + if not typehints.is_consistent_with( + bindings.get(element_arg, typehints.Any), element_hint): + transform_nest_level = self.label.count("/") + split_producer_label = pvalueish.producer.full_label.split("/") + producer_label = "/".join( + split_producer_label[:transform_nest_level + 1]) + raise TypeCheckError( + f"The transform '{self.label}' only accepts " + f"PCollections of type '{element_hint}' " + f"but was applied to a PCollection of type" + f" '{bindings[element_arg]}' " + f"(produced by the transform '{producer_label}'). " + "Please ensure the input PCollection contains " + "elements of the correct type.") + for arg, hint in arg_hints: if arg.startswith('__unknown__'): continue if hint is None: continue if not typehints.is_consistent_with(bindings.get(arg, typehints.Any), hint): - #raise TypeCheckError( - #'Type hint violation for \'{label}\': requires {hint} but got ' - #'{actual_type} for {arg}\nFull type hint:\n{debug_str}'.format( - #label=self.label, - #hint=hint, - #actual_type=bindings[arg], - #arg=arg, - #debug_str=type_hints.debug_str())) - transform_nest_level = self.label.count("/") - split_producer_label = pvalueish.producer.full_label.split("/") - producer_label = "/".join( - split_producer_label[:transform_nest_level + 1]) raise TypeCheckError( - f"The transform '{self.label}' only accepts PCollections of type '{hint}' " - f"but was applied to a PCollection of type '{bindings[arg]}' (produced by the transform '{producer_label}'). " - "Please ensure the input PCollection contains elements of the correct type." - ) + 'Type hint violation for \'{label}\': requires {hint} but got ' + '{actual_type} for {arg}\nFull type hint:\n{debug_str}'.format( + label=self.label, + hint=hint, + actual_type=bindings[arg], + arg=arg, + debug_str=type_hints.debug_str())) def _process_argspec_fn(self): """Returns an argspec of the function actually consuming the data. From d762186b85a96137bb6f026859f4aeb35442b673 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Mon, 28 Oct 2024 22:47:34 -0400 Subject: [PATCH 04/13] fix tests --- .../apache_beam/transforms/ptransform.py | 2 +- .../typehints/typed_pipeline_test.py | 38 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 8e5db5ad95a0..3ea83d07222c 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -948,7 +948,7 @@ def element_type(side_input): producer_label = "/".join( split_producer_label[:transform_nest_level + 1]) raise TypeCheckError( - f"The transform '{self.label}' only accepts " + f"The transform '{self.label}' requires " f"PCollections of type '{element_hint}' " f"but was applied to a PCollection of type" f" '{bindings[element_arg]}' " diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 48139cd7d42e..51e1108dd581 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -88,11 +88,11 @@ def process(self, element): self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_method(self): @@ -104,11 +104,11 @@ def process(self, element: int) -> typehints.Tuple[str]: self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_method_with_class_decorators(self): @@ -124,12 +124,12 @@ def process(self, element: int) -> typehints.Tuple[str]: with self.assertRaisesRegex( typehints.TypeCheckError, - r'accepts.*Tuple\[, \].*applied.*str'): + r'requires.*Tuple\[, \].*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex( typehints.TypeCheckError, - r'accepts.*Tuple\[, \].*applied.*int'): + r'requires.*Tuple\[, \].*applied.*int'): _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_callable_iterable_output(self): @@ -156,11 +156,11 @@ def process(self, element: typehints.Tuple[int, int]) -> \ self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): _ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn)) def test_typed_callable_instance(self): @@ -177,11 +177,11 @@ def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]: self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): _ = ['a', 'b', 'c'] | pardo with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*int.*applied.*str'): + r'requires.*int.*applied.*str'): _ = [1, 2, 3] | (pardo | 'again' >> pardo) def test_filter_type_hint(self): @@ -379,7 +379,7 @@ def process(self, element: int, *, side_input: str) -> \ self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*str.*applied.*int.*side_input'): + r'requires.*str.*got.*int.*side_input'): _ = [1, 2, 3] | beam.ParDo(my_do_fn, side_input=1) def test_typed_dofn_var_kwargs(self): @@ -394,7 +394,7 @@ def process(self, element: int, **side_inputs: typehints.Dict[str, str]) \ self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'accepts.*str.*applied.*int.*side_inputs'): + r'requires.*str.*got.*int.*side_inputs'): _ = [1, 2, 3] | beam.ParDo(my_do_fn, a=1) def test_typed_callable_string_literals(self): @@ -415,7 +415,7 @@ def fn(element: int): return pcoll | beam.ParDo(fn) self.assertListEqual([1, 2, 3], [1, 2, 3] | MyMap()) - with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*applied.*str'): + with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*got.*str'): _ = ['a'] | MyMap() def test_typed_ptransform_fn_conflicting_hints(self): @@ -430,10 +430,10 @@ def fn(element: float): return pcoll | beam.ParDo(fn) with self.assertRaisesRegex(typehints.TypeCheckError, - r'ParDo.*accepts.*float.*applied.*int'): + r'ParDo.*requires.*float.*applied.*int'): _ = [1, 2, 3] | MyMap() with self.assertRaisesRegex(typehints.TypeCheckError, - r'MyMap.*expected.*int.*applied.*str'): + r'MyMap.*expected.*int.*got.*str'): _ = ['a'] | MyMap() def test_typed_dofn_string_literals(self): @@ -743,7 +743,7 @@ def repeat(s, *times): with self.assertRaisesRegex( typehints.TypeCheckError, - (r'accepts Tuple\[, ...\] but got ' + (r'requires Tuple\[, ...\] but got ' r'Tuple\[, ...\]')): ['a', 'bb', 'c'] | beam.Map(repeat, 'z') @@ -761,8 +761,8 @@ def test_var_positional_only_side_input_hint(self): with self.assertRaisesRegex( typehints.TypeCheckError, - r'accepts Tuple\[Union\[, \], ...\] but ' - r'got Tuple\[Union\[, \], ...\]'): + r'requires.*Tuple\[Union\[, \], ...\].*' + r'applied.*Tuple\[Union\[, \], ...\]'): _ = [1.2] | beam.Map(lambda *_: 'a', 5).with_input_types(int, str) def test_var_keyword_side_input_hint(self): @@ -782,7 +782,7 @@ def test_var_keyword_side_input_hint(self): with self.assertRaisesRegex( typehints.TypeCheckError, - r'accepts Dict\[, \] but got ' + r'requires Dict\[, \] but got ' r'Dict\[, \]'): _ = (['a', 'b', 'c'] | beam.Map(lambda e, **_: 'a', kw=5).with_input_types( From 934bd8cb4d3d0fa6ebcc63969c1d87ada48506e0 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Mon, 28 Oct 2024 22:53:02 -0400 Subject: [PATCH 05/13] remove please comment --- sdks/python/apache_beam/transforms/ptransform.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 3ea83d07222c..f22e7da00e19 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -952,9 +952,7 @@ def element_type(side_input): f"PCollections of type '{element_hint}' " f"but was applied to a PCollection of type" f" '{bindings[element_arg]}' " - f"(produced by the transform '{producer_label}'). " - "Please ensure the input PCollection contains " - "elements of the correct type.") + f"(produced by the transform '{producer_label}'). ") for arg, hint in arg_hints: if arg.startswith('__unknown__'): continue From 5f2529b127ed26bede72a569d87a050e9adedc96 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 29 Oct 2024 07:44:44 -0400 Subject: [PATCH 06/13] update tests again --- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 2 +- sdks/python/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index b69cf015ae8f..57e7f44f6922 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -430,7 +430,7 @@ def fn(element: float): return pcoll | beam.ParDo(fn) with self.assertRaisesRegex(typehints.TypeCheckError, - r'ParDo.*requires.*float.*got.*str'): + r'ParDo.*requires.*float.*applied.*str'): _ = ['1', '2', '3'] | MyMap() with self.assertRaisesRegex(typehints.TypeCheckError, r'MyMap.*expected.*str.*got.*bytes'): diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 4eb827297019..6e07d8bc6f42 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -28,7 +28,7 @@ requires = [ # Numpy headers "numpy>=1.14.3,<2.2.0", # Update setup.py as well. # having cython here will create wheels that are platform dependent. - "cython>=3.0,<4", + #"cython>=3.0,<4", ## deps for generating external transform wrappers: # also update PyYaml bounds in sdks:python:generateExternalTransformsConfig 'pyyaml>=3.12,<7.0.0', From 9ae70cec4493751b6c6aebad116fe7603d281c2b Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 29 Oct 2024 07:45:39 -0400 Subject: [PATCH 07/13] remove debug str --- sdks/python/apache_beam/transforms/ptransform.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index f22e7da00e19..41486f798576 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -497,13 +497,12 @@ def type_check_inputs_or_outputs(self, pvalueish, input_or_output): at_context = ' %s %s' % (input_or_output, context) if context else '' raise TypeCheckError( '{type} type hint violation at {label}{context}: expected {hint}, ' - 'got {actual_type}\nFull type hint:\n{debug_str}'.format( + 'got {actual_type}'.format( type=input_or_output.title(), label=self.label, context=at_context, hint=hint, - actual_type=pvalue_.element_type, - debug_str=type_hints.debug_str())) + actual_type=pvalue_.element_type)) def _infer_output_coder(self, input_type=None, input_coder=None): # type: (...) -> Optional[coders.Coder] From 3ec5bda26d43cbf149de42ca4586fd2af3a8a3ac Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 29 Oct 2024 08:09:01 -0400 Subject: [PATCH 08/13] fix more tests --- sdks/python/apache_beam/typehints/decorators_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/typehints/decorators_test.py b/sdks/python/apache_beam/typehints/decorators_test.py index 3baf9fa8322f..71edc75f31a6 100644 --- a/sdks/python/apache_beam/typehints/decorators_test.py +++ b/sdks/python/apache_beam/typehints/decorators_test.py @@ -409,7 +409,7 @@ def fn(a: int) -> int: return a with self.assertRaisesRegex(TypeCheckError, - r'requires .*int.* but got .*str'): + r'requires .*int.* but was applied .*str'): _ = ['a', 'b', 'c'] | Map(fn) # Same pipeline doesn't raise without annotations on fn. @@ -423,7 +423,7 @@ def fn(a: int) -> int: _ = [1, 2, 3] | Map(fn) # Doesn't raise - correct types. with self.assertRaisesRegex(TypeCheckError, - r'requires .*int.* but got .*str'): + r'requires .*int.* but was applied .*str'): _ = ['a', 'b', 'c'] | Map(fn) @decorators.no_annotations From 797b8eb692bcefb46575fdada53818a3e3467b97 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 29 Oct 2024 08:25:44 -0400 Subject: [PATCH 09/13] fix pubsub test --- sdks/python/apache_beam/io/gcp/pubsub_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 2e3e9b301618..73ba8d6abdb6 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -901,7 +901,8 @@ def test_write_messages_with_attributes_error(self, mock_pubsub): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True - with self.assertRaisesRegex(Exception, r'Type hint violation'): + with self.assertRaisesRegex(Exception, + r'requires.*PubsubMessage.*applied.*str'): with TestPipeline(options=options) as p: _ = ( p From 0cee1f3fe9a59ba93b1251d82ef8a366007277c6 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 29 Oct 2024 09:51:34 -0400 Subject: [PATCH 10/13] revert accidental pyproject change --- sdks/python/pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 6e07d8bc6f42..037e5a8aed6b 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -26,9 +26,9 @@ requires = [ # Avoid https://github.com/pypa/virtualenv/issues/2006 "distlib==0.3.7", # Numpy headers - "numpy>=1.14.3,<2.2.0", # Update setup.py as well. + "numpy>=1.14.3,<1.27", # Update setup.py as well. # having cython here will create wheels that are platform dependent. - #"cython>=3.0,<4", + "cython>=3.0,<4", ## deps for generating external transform wrappers: # also update PyYaml bounds in sdks:python:generateExternalTransformsConfig 'pyyaml>=3.12,<7.0.0', From e46b224828263c37a1cad0bad1a61c90b96fdfc0 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 31 Oct 2024 18:27:37 -0400 Subject: [PATCH 11/13] revert pyrpoject change --- sdks/python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 037e5a8aed6b..4eb827297019 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -26,7 +26,7 @@ requires = [ # Avoid https://github.com/pypa/virtualenv/issues/2006 "distlib==0.3.7", # Numpy headers - "numpy>=1.14.3,<1.27", # Update setup.py as well. + "numpy>=1.14.3,<2.2.0", # Update setup.py as well. # having cython here will create wheels that are platform dependent. "cython>=3.0,<4", ## deps for generating external transform wrappers: From 28ab5c8d5134e11a134159fe8c2dd2ac69795033 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 31 Oct 2024 19:14:04 -0400 Subject: [PATCH 12/13] fix ptransform_test tests --- .../apache_beam/transforms/ptransform_test.py | 90 ++++++------------- 1 file changed, 26 insertions(+), 64 deletions(-) diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 0acea547ccdc..7db017a59158 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -1298,17 +1298,13 @@ class ToUpperCaseWithPrefix(beam.DoFn): def process(self, element, prefix): return [prefix + element.upper()] - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) | 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello')) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for element".format(str, int)) - def test_do_fn_pipeline_runtime_type_check_satisfied(self): self.p._options.view_as(TypeOptions).runtime_type_check = True @@ -1335,18 +1331,14 @@ class AddWithNum(beam.DoFn): def process(self, element, num): return [element + num] - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Add.*requires.*int.*applied.*str'): ( self.p | 'T' >> beam.Create(['1', '2', '3']).with_output_types(str) | 'Add' >> beam.ParDo(AddWithNum(), 5)) self.p.run() - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Add': " - "requires {} but got {} for element".format(int, str)) - def test_pardo_does_not_type_check_using_type_hint_decorators(self): @with_input_types(a=int) @with_output_types(typing.List[str]) @@ -1355,17 +1347,13 @@ def int_to_str(a): # The function above is expecting an int for its only parameter. However, it # will receive a str instead, which should result in a raised exception. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'ToStr.*requires.*int.*applied.*str'): ( self.p | 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str) | 'ToStr' >> beam.FlatMap(int_to_str)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'ToStr': " - "requires {} but got {} for a".format(int, str)) - def test_pardo_properly_type_checks_using_type_hint_decorators(self): @with_input_types(a=str) @with_output_types(typing.List[str]) @@ -1387,7 +1375,8 @@ def to_all_upper_case(a): def test_pardo_does_not_type_check_using_type_hint_methods(self): # The first ParDo outputs pcoll's of type int, however the second ParDo is # expecting pcoll's of type str instead. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) @@ -1398,11 +1387,6 @@ def test_pardo_does_not_type_check_using_type_hint_methods(self): 'Upper' >> beam.FlatMap(lambda x: [x.upper()]).with_input_types( str).with_output_types(str))) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for x".format(str, int)) - def test_pardo_properly_type_checks_using_type_hint_methods(self): # Pipeline should be created successfully without an error d = ( @@ -1419,18 +1403,14 @@ def test_pardo_properly_type_checks_using_type_hint_methods(self): def test_map_does_not_type_check_using_type_hints_methods(self): # The transform before 'Map' has indicated that it outputs PCollections with # int's, while Map is expecting one of str. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) | 'Upper' >> beam.Map(lambda x: x.upper()).with_input_types( str).with_output_types(str)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for x".format(str, int)) - def test_map_properly_type_checks_using_type_hints_methods(self): # No error should be raised if this type-checks properly. d = ( @@ -1449,17 +1429,13 @@ def upper(s): # Hinted function above expects a str at pipeline construction. # However, 'Map' should detect that Create has hinted an int instead. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) | 'Upper' >> beam.Map(upper)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for s".format(str, int)) - def test_map_properly_type_checks_using_type_hints_decorator(self): @with_input_types(a=bool) @with_output_types(int) @@ -1477,7 +1453,8 @@ def bool_to_int(a): def test_filter_does_not_type_check_using_type_hints_method(self): # Filter is expecting an int but instead looks to the 'left' and sees a str # incoming. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Below 3.*requires.*int.*applied.*str'): ( self.p | 'Strs' >> beam.Create(['1', '2', '3', '4', '5' @@ -1486,11 +1463,6 @@ def test_filter_does_not_type_check_using_type_hints_method(self): str).with_output_types(str) | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Below 3': " - "requires {} but got {} for x".format(int, str)) - def test_filter_type_checks_using_type_hints_method(self): # No error should be raised if this type-checks properly. d = ( @@ -1508,17 +1480,13 @@ def more_than_half(a): return a > 0.50 # Func above was hinted to only take a float, yet a str will be passed. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Half.*requires.*float.*applied.*str'): ( self.p | 'Ints' >> beam.Create(['1', '2', '3', '4']).with_output_types(str) | 'Half' >> beam.Filter(more_than_half)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Half': " - "requires {} but got {} for a".format(float, str)) - def test_filter_type_checks_using_type_hints_decorator(self): @with_input_types(b=int) def half(b): @@ -2128,14 +2096,10 @@ def test_mean_globally_pipeline_checking_violated(self): self.p | 'C' >> beam.Create(['test']).with_output_types(str) | 'Mean' >> combine.Mean.Globally()) - - expected_msg = \ - "Type hint violation for 'CombinePerKey': " \ - "requires Tuple[TypeVariable[K], Union[, , " \ - ", ]] " \ - "but got Tuple[None, ] for element" - - self.assertStartswith(e.exception.args[0], expected_msg) + err_msg = e.exception.args[0] + assert "CombinePerKey" in err_msg + assert "Tuple[TypeVariable[K]" in err_msg + assert "Tuple[None, " in err_msg def test_mean_globally_runtime_checking_satisfied(self): self.p._options.view_as(TypeOptions).runtime_type_check = True @@ -2195,14 +2159,12 @@ def test_mean_per_key_pipeline_checking_violated(self): typing.Tuple[str, str])) | 'EvenMean' >> combine.Mean.PerKey()) self.p.run() - - expected_msg = \ - "Type hint violation for 'CombinePerKey(MeanCombineFn)': " \ - "requires Tuple[TypeVariable[K], Union[, , " \ - ", ]] " \ - "but got Tuple[, ] for element" - - self.assertStartswith(e.exception.args[0], expected_msg) + err_msg = e.exception.args[0] + assert "CombinePerKey(MeanCombineFn)" in err_msg + assert "requires" in err_msg + assert "Tuple[TypeVariable[K]" in err_msg + assert "applied" in err_msg + assert "Tuple[, ]" in err_msg def test_mean_per_key_runtime_checking_satisfied(self): self.p._options.view_as(TypeOptions).runtime_type_check = True From 6ad169f6004a9c1b7a64523687a5c0402b9d1fa5 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 31 Oct 2024 19:15:49 -0400 Subject: [PATCH 13/13] add explanatory comments for similar looking code --- sdks/python/apache_beam/transforms/ptransform.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 41486f798576..16a43867c261 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -938,6 +938,8 @@ def element_type(side_input): bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types) hints = getcallargs_forhints( argspec_fn, *input_types[0], **input_types[1]) + + # First check the main input. arg_hints = iter(hints.items()) element_arg, element_hint = next(arg_hints) if not typehints.is_consistent_with( @@ -952,6 +954,8 @@ def element_type(side_input): f"but was applied to a PCollection of type" f" '{bindings[element_arg]}' " f"(produced by the transform '{producer_label}'). ") + + # Now check the side inputs. for arg, hint in arg_hints: if arg.startswith('__unknown__'): continue