Skip to content

Commit

Permalink
yapf
Browse files Browse the repository at this point in the history
  • Loading branch information
hjtran committed Sep 16, 2023
1 parent 4a5a007 commit b3a8e15
Showing 1 changed file with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ def test_combineperkey_annotation_propagation(self):
"""
class MyCombinePerKey(beam.CombinePerKey):
def annotations(self):
return {"my_annotation":b""}
return {"my_annotation": b""}

with TestPipeline() as pipeline:
pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min)
_ = pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min)

# Verify the annotations are propagated to the split up
# CPK transforms
Expand All @@ -405,13 +406,13 @@ def annotations(self):
phases=[translations.lift_combiners],
known_runner_urns=frozenset(),
partial=True)
for transform_id in [
'MyCombinePerKey(min)/Precombine',
'MyCombinePerKey(min)/Group',
'MyCombinePerKey(min)/Merge',
'MyCombinePerKey(min)/ExtractOutputs']:
assert ("my_annotation" in
optimized.components.transforms[transform_id].annotations)
for transform_id in ['MyCombinePerKey(min)/Precombine',
'MyCombinePerKey(min)/Group',
'MyCombinePerKey(min)/Merge',
'MyCombinePerKey(min)/ExtractOutputs']:
assert (
"my_annotation" in
optimized.components.transforms[transform_id].annotations)

def test_conditionally_packed_combiners(self):
class RecursiveCombine(beam.PTransform):
Expand Down

0 comments on commit b3a8e15

Please sign in to comment.