Skip to content

Commit

Permalink
fix: abort watch correctly for protobuf client
Browse files Browse the repository at this point in the history
There were two bugs:

* `io.EOF` was ignored for no good reason
* the aggregated/non-aggregated was not properly handled for errors

This should fix COSI controller runtime not crashing on connection close
via gRPC.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Sep 17, 2024
1 parent c0a68e9 commit 5e072dd
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
google.golang.org/grpc v1.66.0
google.golang.org/grpc v1.66.2
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed h1:
google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:OCdP9MfskevB/rbYvHTsXTtKC+3bHWajPdoKgjcYkfo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo=
google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
28 changes: 19 additions & 9 deletions pkg/state/protobuf/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"io"
"log"

"github.com/siderolabs/gen/channel"
"github.com/siderolabs/go-pointer"
Expand Down Expand Up @@ -403,21 +404,30 @@ func (adapter *Adapter) WatchKindAggregated(ctx context.Context, resourceKind re
//nolint:gocognit,gocyclo,cyclop
func watchAdapter(ctx context.Context, cli v1alpha1.State_WatchClient, singleCh chan<- state.Event, aggregatedCh chan<- []state.Event, skipProtobufUnmarshal bool) {
sendError := func(err error) {
channel.SendWithContext(ctx, singleCh,
state.Event{
Type: state.Errored,
Error: err,
switch {
case singleCh != nil:
channel.SendWithContext(ctx, singleCh,
state.Event{
Type: state.Errored,
Error: err,
},
)
case aggregatedCh != nil:
channel.SendWithContext(ctx, aggregatedCh, []state.Event{
{
Type: state.Errored,
Error: err,
},
},
)
)
}
}

for {
msg, err := cli.Recv()
if err != nil {
log.Printf("watchAdapter: %v", err)

switch {
case errors.Is(err, io.EOF):
return
case err != nil:
sendError(err)

return
Expand Down
59 changes: 53 additions & 6 deletions pkg/state/protobuf/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
package protobuf_test

import (
"context"
"errors"
"io/fs"
"net"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"
Expand All @@ -29,15 +32,17 @@ import (
"github.com/cosi-project/runtime/pkg/state/protobuf/server"
)

func TestProtobufConformance(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
func ProtobufSetup(t *testing.T) (grpc.ClientConnInterface, *grpc.Server) {
t.Helper()

t.Cleanup(func() { goleak.VerifyNone(t, goleak.IgnoreCurrent()) })

sock, err := os.CreateTemp("", "api*.sock")
require.NoError(t, err)

require.NoError(t, os.Remove(sock.Name()))

defer noError(t, os.Remove, sock.Name(), fs.ErrNotExist)
t.Cleanup(func() { noError(t, os.Remove, sock.Name(), fs.ErrNotExist) })

l, err := net.Listen("unix", sock.Name())
require.NoError(t, err)
Expand All @@ -55,13 +60,19 @@ func TestProtobufConformance(t *testing.T) {
return struct{}{}
})

defer func() { <-ch }() // ensure that gorotuine is stopped
defer grpcServer.Stop()
t.Cleanup(func() { <-ch }) // ensure that goroutine is stopped
t.Cleanup(grpcServer.Stop)

grpcConn, err := grpc.NewClient("unix://"+sock.Name(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

defer noError(t, (*grpc.ClientConn).Close, grpcConn, fs.ErrNotExist)
t.Cleanup(func() { noError(t, (*grpc.ClientConn).Close, grpcConn, fs.ErrNotExist) })

return grpcConn, grpcServer
}

func TestProtobufConformance(t *testing.T) {
grpcConn, _ := ProtobufSetup(t)

stateClient := v1alpha1.NewStateClient(grpcConn)

Expand All @@ -73,6 +84,42 @@ func TestProtobufConformance(t *testing.T) {
})
}

func TestProtobufWatchAbort(t *testing.T) {
grpcConn, grpcServer := ProtobufSetup(t)

stateClient := v1alpha1.NewStateClient(grpcConn)

st := state.WrapCore(client.NewAdapter(stateClient))

ch := make(chan []state.Event)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)

require.NoError(t, st.WatchKindAggregated(ctx, conformance.NewPathResource("test", "/foo").Metadata(), ch, state.WithBootstrapContents(true)))

select {
case <-ctx.Done():
t.Fatal("timeout")
case ev := <-ch:
require.Len(t, ev, 1)

assert.Equal(t, state.Bootstrapped, ev[0].Type)
}

// abort the server, watch should return an error
grpcServer.Stop()

select {
case <-ctx.Done():
t.Fatal("timeout")
case ev := <-ch:
require.Len(t, ev, 1)

assert.Equal(t, state.Errored, ev[0].Type)
}
}

func noError[T any](t *testing.T, fn func(T) error, v T, ignored ...error) {
t.Helper()

Expand Down

0 comments on commit 5e072dd

Please sign in to comment.