From 3f2336aa00da41459e69cbbcafe4241ff5c97745 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Tue, 19 Sep 2023 16:50:08 -0400 Subject: [PATCH 1/2] label schematransform with its URN --- sdks/python/apache_beam/transforms/external.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 4b8e708bfc5c..70df6f59f962 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -414,7 +414,7 @@ def __init__( def expand(self, pcolls): # Expand the transform using the expansion service. - return pcolls | ExternalTransform( + return pcolls | self._payload_builder._identifier >> ExternalTransform( common_urns.schematransform_based_expand.urn, self._payload_builder, self._expansion_service) From 1a1a6e216cf652297ac89dfabe3ba218d586bddd Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Wed, 20 Sep 2023 10:55:18 -0400 Subject: [PATCH 2/2] create public property for identifier --- sdks/python/apache_beam/transforms/external.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 70df6f59f962..44bf2398a6dd 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -185,6 +185,14 @@ def __init__(self, identifier, **kwargs): self._identifier = identifier self._kwargs = kwargs + def identifier(self): + """ + The URN referencing this SchemaTransform + + :return: str + """ + return self._identifier + def build(self): schema_proto, payload = self._get_schema_proto_and_payload(**self._kwargs) payload = external_transforms_pb2.SchemaTransformPayload( @@ -194,7 +202,7 @@ def build(self): return payload -class ExplicitSchemaTransformPayloadBuilder(PayloadBuilder): +class ExplicitSchemaTransformPayloadBuilder(SchemaTransformPayloadBuilder): def __init__(self, identifier, schema_proto, **kwargs): self._identifier = identifier self._schema_proto = schema_proto @@ -414,7 +422,7 @@ def __init__( def expand(self, pcolls): # Expand the transform using the expansion service. - return pcolls | self._payload_builder._identifier >> ExternalTransform( + return pcolls | self._payload_builder.identifier() >> ExternalTransform( common_urns.schematransform_based_expand.urn, self._payload_builder, self._expansion_service)