diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e52c6048a15c..22d041f34f8b 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1446,6 +1446,9 @@ def from_type_hint(cls, typehint, registry): # pickle coders. return cls(registry.get_coder(typehint.inner_type)) + def to_type_hint(self): + return typehints.WindowedValue[self.wrapped_value_coder.to_type_hint()] + Coder.register_structured_urn( common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index dc9780e36be3..bddd2cb57e06 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -258,6 +258,12 @@ def test_numpy_int(self): _ = indata | "CombinePerKey" >> beam.CombinePerKey(sum) +class WindowedValueCoderTest(unittest.TestCase): + def test_to_type_hint(self): + coder = coders.WindowedValueCoder(coders.VarIntCoder()) + self.assertEqual(coder.to_type_hint(), typehints.WindowedValue[int]) # type: ignore[misc] + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()