diff --git a/go.mod b/go.mod index ab7769a..93ec819 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/sync v0.8.0 golang.org/x/time v0.6.0 - google.golang.org/grpc v1.66.0 + google.golang.org/grpc v1.66.2 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 3404e9f..0e9b8ba 100644 --- a/go.sum +++ b/go.sum @@ -120,8 +120,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed h1: google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:OCdP9MfskevB/rbYvHTsXTtKC+3bHWajPdoKgjcYkfo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= -google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= +google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/state/protobuf/client/client.go b/pkg/state/protobuf/client/client.go index a9bec02..faef17a 100644 --- a/pkg/state/protobuf/client/client.go +++ b/pkg/state/protobuf/client/client.go @@ -9,6 +9,7 @@ import ( "context" "errors" "io" + "log" "github.com/siderolabs/gen/channel" "github.com/siderolabs/go-pointer" @@ -403,21 +404,30 @@ func (adapter *Adapter) WatchKindAggregated(ctx context.Context, resourceKind re //nolint:gocognit,gocyclo,cyclop func watchAdapter(ctx context.Context, cli v1alpha1.State_WatchClient, singleCh chan<- state.Event, aggregatedCh chan<- []state.Event, skipProtobufUnmarshal bool) { sendError := func(err error) { - channel.SendWithContext(ctx, singleCh, - state.Event{ - Type: state.Errored, - Error: err, + switch { + case singleCh != nil: + channel.SendWithContext(ctx, singleCh, + state.Event{ + Type: state.Errored, + Error: err, + }, + ) + case aggregatedCh != nil: + channel.SendWithContext(ctx, aggregatedCh, []state.Event{ + { + Type: state.Errored, + Error: err, + }, }, - ) + ) + } } for { msg, err := cli.Recv() + if err != nil { + log.Printf("watchAdapter: %v", err) - switch { - case errors.Is(err, io.EOF): - return - case err != nil: sendError(err) return diff --git a/pkg/state/protobuf/protobuf_test.go b/pkg/state/protobuf/protobuf_test.go index 074df03..9bce82f 100644 --- a/pkg/state/protobuf/protobuf_test.go +++ b/pkg/state/protobuf/protobuf_test.go @@ -5,12 +5,15 @@ package protobuf_test import ( + "context" "errors" "io/fs" "net" "os" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/goleak" @@ -29,15 +32,17 @@ import ( "github.com/cosi-project/runtime/pkg/state/protobuf/server" ) -func TestProtobufConformance(t *testing.T) { - defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) +func ProtobufSetup(t *testing.T) (grpc.ClientConnInterface, *grpc.Server) { + t.Helper() + + t.Cleanup(func() { goleak.VerifyNone(t, goleak.IgnoreCurrent()) }) sock, err := os.CreateTemp("", "api*.sock") require.NoError(t, err) require.NoError(t, os.Remove(sock.Name())) - defer noError(t, os.Remove, sock.Name(), fs.ErrNotExist) + t.Cleanup(func() { noError(t, os.Remove, sock.Name(), fs.ErrNotExist) }) l, err := net.Listen("unix", sock.Name()) require.NoError(t, err) @@ -55,13 +60,19 @@ func TestProtobufConformance(t *testing.T) { return struct{}{} }) - defer func() { <-ch }() // ensure that gorotuine is stopped - defer grpcServer.Stop() + t.Cleanup(func() { <-ch }) // ensure that goroutine is stopped + t.Cleanup(grpcServer.Stop) grpcConn, err := grpc.NewClient("unix://"+sock.Name(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) - defer noError(t, (*grpc.ClientConn).Close, grpcConn, fs.ErrNotExist) + t.Cleanup(func() { noError(t, (*grpc.ClientConn).Close, grpcConn, fs.ErrNotExist) }) + + return grpcConn, grpcServer +} + +func TestProtobufConformance(t *testing.T) { + grpcConn, _ := ProtobufSetup(t) stateClient := v1alpha1.NewStateClient(grpcConn) @@ -73,6 +84,42 @@ func TestProtobufConformance(t *testing.T) { }) } +func TestProtobufWatchAbort(t *testing.T) { + grpcConn, grpcServer := ProtobufSetup(t) + + stateClient := v1alpha1.NewStateClient(grpcConn) + + st := state.WrapCore(client.NewAdapter(stateClient)) + + ch := make(chan []state.Event) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + require.NoError(t, st.WatchKindAggregated(ctx, conformance.NewPathResource("test", "/foo").Metadata(), ch, state.WithBootstrapContents(true))) + + select { + case <-ctx.Done(): + t.Fatal("timeout") + case ev := <-ch: + require.Len(t, ev, 1) + + assert.Equal(t, state.Bootstrapped, ev[0].Type) + } + + // abort the server, watch should return an error + grpcServer.Stop() + + select { + case <-ctx.Done(): + t.Fatal("timeout") + case ev := <-ch: + require.Len(t, ev, 1) + + assert.Equal(t, state.Errored, ev[0].Type) + } +} + func noError[T any](t *testing.T, fn func(T) error, v T, ignored ...error) { t.Helper()