Skip to content

Commit

Permalink
Merge pull request apache#28489 Propagate annotations of CombinePerKe…
Browse files Browse the repository at this point in the history
…y transforms.

Propagate annotations of CombinePerKey transforms to resulting component transforms
  • Loading branch information
robertwb authored Sep 18, 2023
2 parents 9974334 + b3a8e15 commit 881c597
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,7 @@ def lifted_stages(stage):
payload=transform.spec.payload),
inputs=transform.inputs,
outputs={'out': precombined_pcoll_id},
annotations=transform.annotations,
environment_id=transform.environment_id))

yield make_stage(
Expand All @@ -1355,6 +1356,7 @@ def lifted_stages(stage):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.GROUP_BY_KEY.urn),
inputs={'in': precombined_pcoll_id},
annotations=transform.annotations,
outputs={'out': grouped_pcoll_id}))

yield make_stage(
Expand All @@ -1367,6 +1369,7 @@ def lifted_stages(stage):
payload=transform.spec.payload),
inputs={'in': grouped_pcoll_id},
outputs={'out': merged_pcoll_id},
annotations=transform.annotations,
environment_id=transform.environment_id))

yield make_stage(
Expand All @@ -1379,6 +1382,7 @@ def lifted_stages(stage):
payload=transform.spec.payload),
inputs={'in': merged_pcoll_id},
outputs=transform.outputs,
annotations=transform.annotations,
environment_id=transform.environment_id))

def unlifted_stages(stage):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,36 @@ def expand(self, pcoll):
'multiple-small-combines/min-4-globally/CombinePerKey',
optimized_stage_names)

def test_combineperkey_annotation_propagation(self):
"""
Test that the CPK component transforms inherit annotations from the
source CPK
"""
class MyCombinePerKey(beam.CombinePerKey):
def annotations(self):
return {"my_annotation": b""}

with TestPipeline() as pipeline:
_ = pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min)

# Verify the annotations are propagated to the split up
# CPK transforms
proto = pipeline.to_runner_api(
default_environment=environments.EmbeddedPythonEnvironment(
capabilities=environments.python_sdk_capabilities()))
optimized = translations.optimize_pipeline(
proto,
phases=[translations.lift_combiners],
known_runner_urns=frozenset(),
partial=True)
for transform_id in ['MyCombinePerKey(min)/Precombine',
'MyCombinePerKey(min)/Group',
'MyCombinePerKey(min)/Merge',
'MyCombinePerKey(min)/ExtractOutputs']:
assert (
"my_annotation" in
optimized.components.transforms[transform_id].annotations)

def test_conditionally_packed_combiners(self):
class RecursiveCombine(beam.PTransform):
def __init__(self, labels):
Expand Down

0 comments on commit 881c597

Please sign in to comment.