Skip to content

Commit

Permalink
remove unused props
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas committed Dec 23, 2024
1 parent 90f2f95 commit ca338bc
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
7 changes: 2 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/worker/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
g := grpc.NewServer(opts...)
wk := &MultiplexW{
logger: slog.Default(),
pool: make(map[string]*W),
}

fnpb.RegisterBeamFnControlServer(g, wk)
Expand All @@ -99,9 +98,7 @@ type MultiplexW struct {
fnpb.UnimplementedProvisionServiceServer
healthpb.UnimplementedHealthServer

endpoint string
logger *slog.Logger
pool map[string]*W
logger *slog.Logger
}

func (mw *MultiplexW) Check(_ context.Context, _ *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
Expand Down Expand Up @@ -159,7 +156,7 @@ func (mw *MultiplexW) State(state fnpb.BeamFnState_StateServer) error {
func (mw *MultiplexW) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse {
w, err := Pool.workerFromMetadataCtx(ctx)
if err != nil {
slog.Error(err.Error())
mw.logger.Error(err.Error())
return nil
}
return w.MonitoringMetadata(ctx, unknownIDs)
Expand Down
50 changes: 50 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,53 @@ func TestWorker_State_MultimapSideInput(t *testing.T) {
})
}
}

func TestMapW_workerFromMetadataCtx(t *testing.T) {
tests := []struct {
name string
ctx context.Context
m MapW
want *W
wantErr string
}{
{
name: "missing metadata",
m: make(MapW),
wantErr: "failed to read metadata from context",
},
{
name: "mismatched ctx metadata",
ctx: metadata.NewIncomingContext(context.Background(), metadata.Pairs("worker_id", "wk1")),
m: map[string]*W{
"wk2": {ID: "wk2"},
},
wantErr: "worker id: 'wk1' read from ctx but not registered in worker pool",
},
{
name: "matching ctx metadata",
ctx: metadata.NewIncomingContext(context.Background(), metadata.Pairs("worker_id", "wk1")),
m: map[string]*W{
"wk1": {ID: "wk1"},
},
want: &W{ID: "wk1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.ctx == nil {
tt.ctx = context.Background()
}
got, err := tt.m.workerFromMetadataCtx(tt.ctx)
if err != nil && err.Error() != tt.wantErr {
t.Errorf("workerFromMetadataCtx() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantErr != "" {
return
}
if got.ID != tt.want.ID {
t.Errorf("workerFromMetadataCtx() got = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit ca338bc

Please sign in to comment.