Skip to content

Commit

Permalink
[prism] support single external env pipelines. (apache#28083)
Browse files Browse the repository at this point in the history
* [prism] support single external env pipelines.

* provide clear error message

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck committed Aug 30, 2023
1 parent 2616f94 commit 338a9c7
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func RunPipeline(j *jobservices.Job) {
// environments, and start up docker containers, but
// here, we only want and need the go one, operating
// in loopback mode.
env := "go"
envs := j.Pipeline.GetComponents().GetEnvironments()
if len(envs) != 1 {
j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs))
return
}
env, _ := getOnlyPair(envs)
wk := worker.New(env) // Cheating by having the worker id match the environment id.
go wk.Serve()

Expand Down Expand Up @@ -302,12 +307,17 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod
return makeWindowCoders(coders[wcID])
}

func getOnlyValue[K comparable, V any](in map[K]V) V {
func getOnlyPair[K comparable, V any](in map[K]V) (K, V) {
if len(in) != 1 {
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
}
for _, v := range in {
return v
for k, v := range in {
return k, v
}
panic("unreachable")
}

func getOnlyValue[K comparable, V any](in map[K]V) V {
_, v := getOnlyPair(in)
return v
}

0 comments on commit 338a9c7

Please sign in to comment.