diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 57d8197a3a00..980f375c5186 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1039,11 +1039,16 @@ def __hash__(self): @classmethod def from_type_hint(cls, typehint, unused_registry): - if issubclass(typehint, proto_utils.message_types): + # The typehint must be a subclass of google.protobuf.message.Message. + # Using message.Message itself prevents ProtoCoder usage, as required APIs + # are not implemented in the base class. If this occurs, an error is raised + # and the system defaults to other fallback coders. + if (issubclass(typehint, proto_utils.message_types) and + typehint != message.Message): return cls(typehint) else: raise ValueError(( - 'Expected a subclass of google.protobuf.message.Message' + 'Expected a strict subclass of google.protobuf.message.Message' ', but got a %s' % typehint)) def to_type_hint(self): diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index dc9780e36be3..f0bd8471840f 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -20,6 +20,7 @@ import logging import unittest +from google.protobuf import message import proto import pytest @@ -86,6 +87,23 @@ def test_proto_coder(self): self.assertEqual(ma, real_coder.decode(real_coder.encode(ma))) self.assertEqual(ma.__class__, real_coder.to_type_hint()) + def test_proto_coder_on_protobuf_message_subclasses(self): + # This replicates a scenario where users provide message.Message as the + # output typehint for a Map function, even though the actual output messages + # are subclasses of message.Message. + ma = test_message.MessageA() + mb = ma.field2.add() + mb.field1 = True + ma.field1 = 'hello world' + + coder = coders_registry.get_coder(message.Message) + # For messages of google.protobuf.message.Message, the fallback coder will + # be FastPrimitiveCoder other than ProtoCoder. + # See the comment on ProtoCoder.from_type_hint() for further details. + self.assertEqual(coder, coders.FastPrimitivesCoder()) + + self.assertEqual(ma, coder.decode(coder.encode(ma))) + class DeterministicProtoCoderTest(unittest.TestCase): def test_deterministic_proto_coder(self):