Skip to content

Commit

Permalink
[Cherry Pick #28083][prism] support single external env pipelines. (#…
Browse files Browse the repository at this point in the history
…28111)

* support single external env pipelines.

* provide clear error message

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Aug 22, 2023
1 parent c42daee commit 8293bbd
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func RunPipeline(j *jobservices.Job) {
// environments, and start up docker containers, but
// here, we only want and need the go one, operating
// in loopback mode.
env := "go"
envs := j.Pipeline.GetComponents().GetEnvironments()
if len(envs) != 1 {
j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs))
return
}
env, _ := getOnlyPair(envs)
wk := worker.New(env) // Cheating by having the worker id match the environment id.
go wk.Serve()

Expand Down Expand Up @@ -302,12 +307,17 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod
return makeWindowCoders(coders[wcID])
}

func getOnlyValue[K comparable, V any](in map[K]V) V {
func getOnlyPair[K comparable, V any](in map[K]V) (K, V) {
if len(in) != 1 {
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
}
for _, v := range in {
return v
for k, v := range in {
return k, v
}
panic("unreachable")
}

func getOnlyValue[K comparable, V any](in map[K]V) V {
_, v := getOnlyPair(in)
return v
}

0 comments on commit 8293bbd

Please sign in to comment.