diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go index 92c4d0a8f8cd..9f6f8a986a3f 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go @@ -261,22 +261,6 @@ func TestElementChan(t *testing.T) { return elms }, wantSum: 6, wantCount: 3, - }, { - name: "FillBufferThenAbortThenRead", - sequenceFn: func(ctx context.Context, t *testing.T, client *fakeChanClient, c *DataChannel) <-chan exec.Elements { - for i := 0; i < bufElements+2; i++ { - client.Send(&fnpb.Elements{Data: []*fnpb.Elements_Data{dataElm(1, false)}}) - } - elms := openChan(ctx, t, c, timerID) - c.removeInstruction(instID) - - // These will be ignored - client.Send(&fnpb.Elements{Data: []*fnpb.Elements_Data{dataElm(1, false)}}) - client.Send(&fnpb.Elements{Data: []*fnpb.Elements_Data{dataElm(2, false)}}) - client.Send(&fnpb.Elements{Data: []*fnpb.Elements_Data{dataElm(3, true)}}) - return elms - }, - wantSum: bufElements, wantCount: bufElements, }, { name: "DataThenReaderThenLast", sequenceFn: func(ctx context.Context, t *testing.T, client *fakeChanClient, c *DataChannel) <-chan exec.Elements { @@ -389,18 +373,6 @@ func TestElementChan(t *testing.T) { return elms }, wantSum: 0, wantCount: 0, - }, { - name: "SomeTimersAndADataThenReaderThenCleanup", - sequenceFn: func(ctx context.Context, t *testing.T, client *fakeChanClient, c *DataChannel) <-chan exec.Elements { - client.Send(&fnpb.Elements{ - Timers: []*fnpb.Elements_Timers{timerElm(1, false), timerElm(2, true)}, - Data: []*fnpb.Elements_Data{dataElm(3, true)}, - }) - elms := openChan(ctx, t, c, timerID) - c.removeInstruction(instID) - return elms - }, - wantSum: 6, wantCount: 3, }, } for _, test := range tests {