Skip to content

Commit

Permalink
Add a test for getting state with MultimapSideInput StateKey (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mls3odp authored and reeba212 committed Dec 4, 2024
1 parent 907c4a4 commit 1c5c50f
Showing 1 changed file with 80 additions and 1 deletion.
81 changes: 80 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package worker
import (
"bytes"
"context"
"github.com/google/go-cmp/cmp"
"net"
"sort"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
Expand Down Expand Up @@ -386,3 +387,81 @@ func TestWorker_State_MultimapKeysSideInput(t *testing.T) {
})
}
}

func TestWorker_State_MultimapSideInput(t *testing.T) {
for _, tt := range []struct {
name string
w typex.Window
}{
{
name: "global window",
w: window.GlobalWindow{},
},
{
name: "interval window",
w: window.IntervalWindow{
Start: 1000,
End: 2000,
},
},
} {
t.Run(tt.name, func(t *testing.T) {
var encW []byte
if !tt.w.Equals(window.GlobalWindow{}) {
buf := bytes.Buffer{}
if err := exec.MakeWindowEncoder(coder.NewIntervalWindow()).EncodeSingle(tt.w, &buf); err != nil {
t.Fatalf("error encoding window: %v, err: %v", tt.w, err)
}
encW = buf.Bytes()
}
wk, stateStream, done := serveTestWorkerStateStream(t)
defer done()
instID := wk.NextInst()
wk.activeInstructions[instID] = &B{
MultiMapSideInputData: map[SideInputKey]map[typex.Window]map[string][][]byte{
SideInputKey{
TransformID: "transformID",
Local: "i1",
}: {
tt.w: map[string][][]byte{"a": {{5}}, "b": {{12}}},
},
},
}
var testKey = []string{"a", "b", "x"}
expectedResult := map[string][]int{
"a": {5},
"b": {12},
}
for _, key := range testKey {
stateStream.Send(&fnpb.StateRequest{
Id: "first",
InstructionId: instID,
Request: &fnpb.StateRequest_Get{
Get: &fnpb.StateGetRequest{},
},
StateKey: &fnpb.StateKey{Type: &fnpb.StateKey_MultimapSideInput_{
MultimapSideInput: &fnpb.StateKey_MultimapSideInput{
TransformId: "transformID",
SideInputId: "i1",
Window: encW,
Key: []byte(key),
},
}},
})

resp, err := stateStream.Recv()
if err != nil {
t.Fatal("Couldn't receive state response:", err)
}

var got []int
for _, b := range resp.GetGet().GetData() {
got = append(got, int(b))
}
if !cmp.Equal(got, expectedResult[key]) {
t.Errorf("For test key: %v, didn't receive expected state response data: got %v, want %v", key, got, expectedResult[key])
}
}
})
}
}

0 comments on commit 1c5c50f

Please sign in to comment.