Skip to content

Commit

Permalink
[apache#32139] Fail pipelines with Stateful SDFs.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Aug 9, 2024
1 parent 18849de commit db3cdfa
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
// Queue initial state of the job.
job.state.Store(jobpb.JobState_STOPPED)
s.jobs[job.key] = job
fmt.Println("preparing job:", job.key)

if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
job.Failed(err)
Expand Down Expand Up @@ -158,17 +159,26 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
return nil, fmt.Errorf("unable to unmarshal ParDoPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
}

isStateful := false

// Validate all the state features
for _, spec := range pardo.GetStateSpecs() {
isStateful = true
check("StateSpec.Protocol.Urn", spec.GetProtocol().GetUrn(), urns.UserStateBag, urns.UserStateMultiMap)
}
// Validate all the timer features
for _, spec := range pardo.GetTimerFamilySpecs() {
isStateful = true
check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME)
}

check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.

// Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139
if pardo.GetRestrictionCoderId() != "" && isStateful {
check("Splittable+Stateful DoFn", "See https://github.com/apache/beam/issues/32139 for information.", "")
}

case urns.TransformTestStream:
var testStream pipepb.TestStreamPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {
Expand Down

0 comments on commit db3cdfa

Please sign in to comment.