You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Prism doesn't currently correctly support Stateful Splittable DoFns. That is, SDFs with State and Timers. This issue is to track whether this changes, and to outline the problem so an error message can link here.
The recommended work around is to split the SDF into two: An SDF, and a StatefulDoFn. Ideally the KV key into the SDF is different than what's into the Stateful DoFn, to prevent the implied fusion break of the StatefulDoFn from being less efficient. (eg. if the SDF is keyed by a partition or a filename, it shouldn't be emitting values with that as the key to the splittable DoFn. That limits parallelism anyway.)
I believe the user expectation would be to base state on their Key, and not the full Element+Restriction pair, or specifically the Key+Value+Restriction triple.
So the element manager would need to be aware of this for extracting the key for the element, and processing only one bundle for the key at a time, and also not "break" the key handling for state elsewhere.
The text was updated successfully, but these errors were encountered:
In discussion with Kenn, we came to the conclusion that while from a user affordance perspective, they are orthogonal, and thus compatible features, Kenn noted "But in some sense they are, in fact, diametrical opposites. State is all about non-parallelism, while splitting is all about parallelism." and I can't help but agree.
I now have a task to document the contraindication in the programming guide and protocol buffers.
reeba212
pushed a commit
to reeba212/beam
that referenced
this issue
Dec 4, 2024
Prism doesn't currently correctly support Stateful Splittable DoFns. That is, SDFs with State and Timers. This issue is to track whether this changes, and to outline the problem so an error message can link here.
The recommended work around is to split the SDF into two: An SDF, and a StatefulDoFn. Ideally the KV key into the SDF is different than what's into the Stateful DoFn, to prevent the implied fusion break of the StatefulDoFn from being less efficient. (eg. if the SDF is keyed by a partition or a filename, it shouldn't be emitting values with that as the key to the splittable DoFn. That limits parallelism anyway.)
The difficulty is that what we execute for an SDF is not based on the users's key due to the rewriting an SDF goes though. The executed KV for a ProcessSizedElementAndRestriction is a
KV(KV(element, restriction), float64)
, where element may be a user KV itself with what the user key dictates. (See https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go#L126)I believe the user expectation would be to base state on their Key, and not the full Element+Restriction pair, or specifically the Key+Value+Restriction triple.
So the element manager would need to be aware of this for extracting the key for the element, and processing only one bundle for the key at a time, and also not "break" the key handling for state elsewhere.
The text was updated successfully, but these errors were encountered: