You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Take a streaming job that reads from a PubSub subscription, do some transformations and sink it to a PubSub Topic.
The message has a lot of duplicate "rows", I need to expose the primary key for an element and GroupByKey() to be able to fetch the latest one. Once I expose pk and group by it, the step downstream might have keys with no values.
If I remove theearly=trigger.AfterProcessingTime(some_time)it did work as expected (keys and values are emitted together).
It does not make sense to emit to the downstream steps only keys but not values. Both are part of the same output tuple and they cannot be emitted separately.
Here's part of the code:
(
to_sync
| "AddTimestamp" >> beam.ParDo(AddTimestamp())
| "Window" >> beam.WindowInto(
windowfn=window.FixedWindows(60),
trigger=trigger.AfterWatermark(early=trigger.AfterProcessingTime(30)),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
| "GroupAndFetchLatest" >> GroupAndFetchLatest()
...
)
class GroupAndFetchLatest(beam.PTransform):
"""Expose record PK, group by it and fetch the latest record (timestamp)"""
def expand(self, pcoll):
return (
pcoll
| "ExposeKey" >> beam.Map(lambda x: (x["mykey"], x))
| "Group" >> beam.GroupByKey()
| "GetLatest" >> beam.ParDo(GetLatest())
)
class GetLatest(beam.DoFn):
def process(self, grouped):
key, values = grouped
if values: # Here, I had to add this "if" to avoid keys without values.
latest = max(values, key=lambda o: o["__ts__"])
yield MyObject(
PK=key,
row=latest
)
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
Component: Python SDK
Component: Java SDK
Component: Go SDK
Component: Typescript SDK
Component: IO connector
Component: Beam YAML
Component: Beam examples
Component: Beam playground
Component: Beam katas
Component: Website
Component: Infrastructure
Component: Spark Runner
Component: Flink Runner
Component: Samza Runner
Component: Twister2 Runner
Component: Hazelcast Jet Runner
Component: Google Cloud Dataflow Runner
The text was updated successfully, but these errors were encountered:
What happened?
Setup:
Take a streaming job that reads from a PubSub subscription, do some transformations and sink it to a PubSub Topic.
The message has a lot of duplicate "rows", I need to expose the primary key for an element and
GroupByKey()
to be able to fetch the latest one. Once I exposepk
and group by it, the step downstream might have keys with no values.If I remove the
early=trigger.AfterProcessingTime(some_time)
it did work as expected (keys and values are emitted together).It does not make sense to emit to the downstream steps only keys but not values. Both are part of the same output tuple and they cannot be emitted separately.
Here's part of the code:
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: