From aacbd600a71f0700c8ba65cde0de88cf64b4c45b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Dec 2024 14:28:57 -0500 Subject: [PATCH 1/5] Use other fallback coders for protobuf Message base class. --- sdks/python/apache_beam/coders/coders.py | 9 +++++++-- sdks/python/apache_beam/coders/coders_test.py | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 57d8197a3a00..980f375c5186 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1039,11 +1039,16 @@ def __hash__(self): @classmethod def from_type_hint(cls, typehint, unused_registry): - if issubclass(typehint, proto_utils.message_types): + # The typehint must be a subclass of google.protobuf.message.Message. + # Using message.Message itself prevents ProtoCoder usage, as required APIs + # are not implemented in the base class. If this occurs, an error is raised + # and the system defaults to other fallback coders. + if (issubclass(typehint, proto_utils.message_types) and + typehint != message.Message): return cls(typehint) else: raise ValueError(( - 'Expected a subclass of google.protobuf.message.Message' + 'Expected a strict subclass of google.protobuf.message.Message' ', but got a %s' % typehint)) def to_type_hint(self): diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index dc9780e36be3..f0bd8471840f 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -20,6 +20,7 @@ import logging import unittest +from google.protobuf import message import proto import pytest @@ -86,6 +87,23 @@ def test_proto_coder(self): self.assertEqual(ma, real_coder.decode(real_coder.encode(ma))) self.assertEqual(ma.__class__, real_coder.to_type_hint()) + def test_proto_coder_on_protobuf_message_subclasses(self): + # This replicates a scenario where users provide message.Message as the + # output typehint for a Map function, even though the actual output messages + # are subclasses of message.Message. + ma = test_message.MessageA() + mb = ma.field2.add() + mb.field1 = True + ma.field1 = 'hello world' + + coder = coders_registry.get_coder(message.Message) + # For messages of google.protobuf.message.Message, the fallback coder will + # be FastPrimitiveCoder other than ProtoCoder. + # See the comment on ProtoCoder.from_type_hint() for further details. + self.assertEqual(coder, coders.FastPrimitivesCoder()) + + self.assertEqual(ma, coder.decode(coder.encode(ma))) + class DeterministicProtoCoderTest(unittest.TestCase): def test_deterministic_proto_coder(self): From 1d92b018aa100d810156c76f8bb35e37f5cde860 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Dec 2024 14:39:57 -0500 Subject: [PATCH 2/5] Minor change on comments --- sdks/python/apache_beam/coders/coders.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 980f375c5186..969353da1c6b 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1040,8 +1040,8 @@ def __hash__(self): @classmethod def from_type_hint(cls, typehint, unused_registry): # The typehint must be a subclass of google.protobuf.message.Message. - # Using message.Message itself prevents ProtoCoder usage, as required APIs - # are not implemented in the base class. If this occurs, an error is raised + # ProtoCoder cannot work with message.Message itself, as required APIs are + # not implemented in the base class. If this occurs, an error is raised # and the system defaults to other fallback coders. if (issubclass(typehint, proto_utils.message_types) and typehint != message.Message): From 7d6dc4af96d028134e0076e674064c520429f5bd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Dec 2024 14:55:26 -0500 Subject: [PATCH 3/5] Revise the comments based on review. --- sdks/python/apache_beam/coders/coders.py | 10 ++++++---- sdks/python/apache_beam/coders/coders_test.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 969353da1c6b..ee076f8a5a78 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1039,10 +1039,12 @@ def __hash__(self): @classmethod def from_type_hint(cls, typehint, unused_registry): - # The typehint must be a subclass of google.protobuf.message.Message. - # ProtoCoder cannot work with message.Message itself, as required APIs are - # not implemented in the base class. If this occurs, an error is raised - # and the system defaults to other fallback coders. + # The typehint must be a strict subclass of google.protobuf.message.Message. + # ProtoCoder cannot work with message.Message itself, as deserialization of + # a serialized proto requires knowledge of the desired concrete proto + # subclass which is not stored in the encoded bytes themselves. If this + # occurs, an error is raised and the system defaults to other fallback + # coders. if (issubclass(typehint, proto_utils.message_types) and typehint != message.Message): return cls(typehint) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index f0bd8471840f..bf9d94b93f0c 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -98,7 +98,7 @@ def test_proto_coder_on_protobuf_message_subclasses(self): coder = coders_registry.get_coder(message.Message) # For messages of google.protobuf.message.Message, the fallback coder will - # be FastPrimitiveCoder other than ProtoCoder. + # be FastPrimitivesCoder rather than ProtoCoder. # See the comment on ProtoCoder.from_type_hint() for further details. self.assertEqual(coder, coders.FastPrimitivesCoder()) From 29e996af5de360f8ef52e9f9c2801c81d94f0d71 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Dec 2024 16:15:54 -0500 Subject: [PATCH 4/5] Move import out of if condition. --- sdks/python/apache_beam/coders/coders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index ee076f8a5a78..afbc144751bc 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -53,6 +53,7 @@ from typing import TypeVar from typing import overload +from google.protobuf import message import google.protobuf.wrappers_pb2 import proto @@ -65,7 +66,6 @@ from apache_beam.utils import proto_utils if TYPE_CHECKING: - from google.protobuf import message # pylint: disable=ungrouped-imports from apache_beam.coders.typecoders import CoderRegistry from apache_beam.runners.pipeline_context import PipelineContext From a067b1ec24d9a50ee9aeaa919e5dd70de0334e5a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 20 Dec 2024 19:31:38 -0500 Subject: [PATCH 5/5] Fix lints --- sdks/python/apache_beam/coders/coders.py | 2 +- sdks/python/apache_beam/coders/coders_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index afbc144751bc..0f2a42686854 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -53,9 +53,9 @@ from typing import TypeVar from typing import overload -from google.protobuf import message import google.protobuf.wrappers_pb2 import proto +from google.protobuf import message from apache_beam.coders import coder_impl from apache_beam.coders.avro_record import AvroRecord diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index bf9d94b93f0c..5e5debca36e6 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -20,9 +20,9 @@ import logging import unittest -from google.protobuf import message import proto import pytest +from google.protobuf import message import apache_beam as beam from apache_beam import typehints