diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index a205c768731b..910fa0a0ca87 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -297,6 +297,10 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat outputTime = func(w typex.Window, et mtime.Time) mtime.Time { return w.MaxTimestamp() } + case pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE: + outputTime = func(w typex.Window, et mtime.Time) mtime.Time { + return et + } default: // TODO need to correct session logic if output time is different. panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime())) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 737a1b22276a..3efe48e23119 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -22,6 +22,7 @@ import ( "sync" "sync/atomic" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" @@ -195,7 +196,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo // Inspect Windowing strategies for unsupported features. for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { - check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0)) + check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0), mtime.MaxTimestamp.Milliseconds()) + // Both Closing behaviors are identical without additional trigger firings. check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS) check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)