diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index c94a98c13157..f8b6b6f33ab8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -47,7 +47,12 @@ func RunPipeline(j *jobservices.Job) { // environments, and start up docker containers, but // here, we only want and need the go one, operating // in loopback mode. - env := "go" + envs := j.Pipeline.GetComponents().GetEnvironments() + if len(envs) != 1 { + j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs)) + return + } + env, _ := getOnlyPair(envs) wk := worker.New(env) // Cheating by having the worker id match the environment id. go wk.Serve() @@ -302,12 +307,17 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod return makeWindowCoders(coders[wcID]) } -func getOnlyValue[K comparable, V any](in map[K]V) V { +func getOnlyPair[K comparable, V any](in map[K]V) (K, V) { if len(in) != 1 { panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in)) } - for _, v := range in { - return v + for k, v := range in { + return k, v } panic("unreachable") } + +func getOnlyValue[K comparable, V any](in map[K]V) V { + _, v := getOnlyPair(in) + return v +}