From 17129643d307e55670de28ec6f07ad326d666b77 Mon Sep 17 00:00:00 2001 From: Hai Joey Tran Date: Fri, 6 Dec 2024 17:03:32 -0500 Subject: [PATCH] Fix FlatMapTuple typehint bug (#33307) * create unit test * minimize to not using flatmaptuple * fix by adding a tuple conersion in flatmaptuple * add comment referring to ticket * remove extra pipeline * manually isort * retrigger builder * retrigger builder * isort? * try manually isorting again * Revert "try manually isorting again" This reverts commit a0fac321a15a07169fb27e217b61be3edc73d157. * manually fix isort --- sdks/python/apache_beam/transforms/core.py | 2 +- .../typehints/typed_pipeline_test.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 9c798d3ce6dc..4d1678d72a69 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2238,7 +2238,7 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name if defaults or args or kwargs: wrapper = lambda x, *args, **kwargs: fn(*(tuple(x) + args), **kwargs) else: - wrapper = lambda x: fn(*x) + wrapper = lambda x: fn(*tuple(x)) # Proxy the type-hint information from the original function to this new # wrapped function. diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 44318fa44a8c..820f78fa9ef5 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -21,6 +21,7 @@ import typing import unittest +from typing import Tuple import apache_beam as beam from apache_beam import pvalue @@ -999,5 +1000,22 @@ def filter_fn(element: int) -> bool: self.assertEqual(th.output_types, ((int, ), {})) +class TestFlatMapTuple(unittest.TestCase): + def test_flatmaptuple(self): + # Regression test. See + # https://github.com/apache/beam/issues/33014 + + def identity(x: Tuple[str, int]) -> Tuple[str, int]: + return x + + with beam.Pipeline() as p: + # Just checking that this doesn't raise an exception. + ( + p + | "Generate input" >> beam.Create([('P1', [2])]) + | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) + | "Identity" >> beam.Map(identity)) + + if __name__ == '__main__': unittest.main()