Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: abort watch correctly for protobuf client #494

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
google.golang.org/grpc v1.66.0
google.golang.org/grpc v1.66.2
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed h1:
google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:OCdP9MfskevB/rbYvHTsXTtKC+3bHWajPdoKgjcYkfo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo=
google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
27 changes: 17 additions & 10 deletions pkg/state/protobuf/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,21 +403,28 @@ func (adapter *Adapter) WatchKindAggregated(ctx context.Context, resourceKind re
//nolint:gocognit,gocyclo,cyclop
func watchAdapter(ctx context.Context, cli v1alpha1.State_WatchClient, singleCh chan<- state.Event, aggregatedCh chan<- []state.Event, skipProtobufUnmarshal bool) {
sendError := func(err error) {
channel.SendWithContext(ctx, singleCh,
state.Event{
Type: state.Errored,
Error: err,
switch {
case singleCh != nil:
channel.SendWithContext(ctx, singleCh,
state.Event{
Type: state.Errored,
Error: err,
},
)
case aggregatedCh != nil:
channel.SendWithContext(ctx, aggregatedCh, []state.Event{
{
Type: state.Errored,
Error: err,
},
},
)
)
}
}

for {
msg, err := cli.Recv()

switch {
case errors.Is(err, io.EOF):
return
case err != nil:
if err != nil {
sendError(err)

return
Expand Down
59 changes: 53 additions & 6 deletions pkg/state/protobuf/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
package protobuf_test

import (
"context"
"errors"
"io/fs"
"net"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"
Expand All @@ -29,15 +32,17 @@ import (
"github.com/cosi-project/runtime/pkg/state/protobuf/server"
)

func TestProtobufConformance(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
func ProtobufSetup(t *testing.T) (grpc.ClientConnInterface, *grpc.Server) {
t.Helper()

t.Cleanup(func() { goleak.VerifyNone(t, goleak.IgnoreCurrent()) })

sock, err := os.CreateTemp("", "api*.sock")
require.NoError(t, err)

require.NoError(t, os.Remove(sock.Name()))

defer noError(t, os.Remove, sock.Name(), fs.ErrNotExist)
t.Cleanup(func() { noError(t, os.Remove, sock.Name(), fs.ErrNotExist) })

l, err := net.Listen("unix", sock.Name())
require.NoError(t, err)
Expand All @@ -55,13 +60,19 @@ func TestProtobufConformance(t *testing.T) {
return struct{}{}
})

defer func() { <-ch }() // ensure that gorotuine is stopped
defer grpcServer.Stop()
t.Cleanup(func() { <-ch }) // ensure that goroutine is stopped
t.Cleanup(grpcServer.Stop)

grpcConn, err := grpc.NewClient("unix://"+sock.Name(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

defer noError(t, (*grpc.ClientConn).Close, grpcConn, fs.ErrNotExist)
t.Cleanup(func() { noError(t, (*grpc.ClientConn).Close, grpcConn, fs.ErrNotExist) })

return grpcConn, grpcServer
}

func TestProtobufConformance(t *testing.T) {
grpcConn, _ := ProtobufSetup(t)

stateClient := v1alpha1.NewStateClient(grpcConn)

Expand All @@ -73,6 +84,42 @@ func TestProtobufConformance(t *testing.T) {
})
}

func TestProtobufWatchAbort(t *testing.T) {
grpcConn, grpcServer := ProtobufSetup(t)

stateClient := v1alpha1.NewStateClient(grpcConn)

st := state.WrapCore(client.NewAdapter(stateClient))

ch := make(chan []state.Event)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)

require.NoError(t, st.WatchKindAggregated(ctx, conformance.NewPathResource("test", "/foo").Metadata(), ch, state.WithBootstrapContents(true)))

select {
case <-ctx.Done():
t.Fatal("timeout")
case ev := <-ch:
require.Len(t, ev, 1)

assert.Equal(t, state.Bootstrapped, ev[0].Type)
}

// abort the server, watch should return an error
grpcServer.Stop()

select {
case <-ctx.Done():
t.Fatal("timeout")
case ev := <-ch:
require.Len(t, ev, 1)

assert.Equal(t, state.Errored, ev[0].Type)
}
}

func noError[T any](t *testing.T, fn func(T) error, v T, ignored ...error) {
t.Helper()

Expand Down
Loading