diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 7676d958031c..811ee79100be 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -94,6 +94,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo // Queue initial state of the job. job.state.Store(jobpb.JobState_STOPPED) s.jobs[job.key] = job + fmt.Println("preparing job:", job.key) if err := isSupported(job.Pipeline.GetRequirements()); err != nil { job.Failed(err) @@ -158,17 +159,26 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo return nil, fmt.Errorf("unable to unmarshal ParDoPayload for %v - %q: %w", tid, t.GetUniqueName(), err) } + isStateful := false + // Validate all the state features for _, spec := range pardo.GetStateSpecs() { + isStateful = true check("StateSpec.Protocol.Urn", spec.GetProtocol().GetUrn(), urns.UserStateBag, urns.UserStateMultiMap) } // Validate all the timer features for _, spec := range pardo.GetTimerFamilySpecs() { + isStateful = true check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME) } check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now. + // Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139 + if pardo.GetRestrictionCoderId() != "" && isStateful { + check("Splittable+Stateful DoFn", "See https://github.com/apache/beam/issues/32139 for information.", "") + } + case urns.TransformTestStream: var testStream pipepb.TestStreamPayload if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {