Skip to content

Commit

Permalink
Add missing to_type_hint to WindowedValueCoder
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Dec 17, 2024
1 parent 8e1e124 commit d077e74
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,9 @@ def from_type_hint(cls, typehint, registry):
# pickle coders.
return cls(registry.get_coder(typehint.inner_type))

def to_type_hint(self):
return typehints.WindowedValue[self.wrapped_value_coder.to_type_hint()]


Coder.register_structured_urn(
common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder)
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/coders/coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ def test_numpy_int(self):
_ = indata | "CombinePerKey" >> beam.CombinePerKey(sum)


class WindowedValueCoderTest(unittest.TestCase):
def test_to_type_hint(self):
coder = coders.WindowedValueCoder(coders.VarIntCoder())
self.assertEqual(coder.to_type_hint(), typehints.WindowedValue[int])


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()

0 comments on commit d077e74

Please sign in to comment.