diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 4b8e708bfc5c..44bf2398a6dd 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -185,6 +185,14 @@ def __init__(self, identifier, **kwargs): self._identifier = identifier self._kwargs = kwargs + def identifier(self): + """ + The URN referencing this SchemaTransform + + :return: str + """ + return self._identifier + def build(self): schema_proto, payload = self._get_schema_proto_and_payload(**self._kwargs) payload = external_transforms_pb2.SchemaTransformPayload( @@ -194,7 +202,7 @@ def build(self): return payload -class ExplicitSchemaTransformPayloadBuilder(PayloadBuilder): +class ExplicitSchemaTransformPayloadBuilder(SchemaTransformPayloadBuilder): def __init__(self, identifier, schema_proto, **kwargs): self._identifier = identifier self._schema_proto = schema_proto @@ -414,7 +422,7 @@ def __init__( def expand(self, pcolls): # Expand the transform using the expansion service. - return pcolls | ExternalTransform( + return pcolls | self._payload_builder.identifier() >> ExternalTransform( common_urns.schematransform_based_expand.urn, self._payload_builder, self._expansion_service)