From 765f431762f5d4f6823351a58f3922647d7c2a22 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 3 Feb 2023 11:13:30 -0800 Subject: [PATCH] refactor and testing grpc contract Signed-off-by: Kavindu Dodanduwa --- go.mod | 4 +- go.sum | 4 + pkg/sync/grpc/grpc_sync.go | 31 ++-- pkg/sync/grpc/grpc_sync_test.go | 293 +++++++++++++++++++++++++++++++- 4 files changed, 314 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index e26b98411..327794f83 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/open-feature/flagd go 1.19 require ( - buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4 - buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4 + buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4 + buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4 buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 diff --git a/go.sum b/go.sum index 1fb2c0991..f2c14e1c9 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,12 @@ buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.2.0-20220906183531-bc28b buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM= buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4 h1:v19nAzB4Q4cSQq7PhmgeUPHQJYouc68caoBI/HdGw+k= buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4/go.mod h1:GZ8j43uPGP/wa1L/z3q4hr5ur6hyG+WkXTtVEZqFBzs= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4 h1:0pM53BFVy3Xsy9IkD3pQjtIIZ6s8gvipBmfZbB2LjBM= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4/go.mod h1:1Di60p+uyAMgPXbZ9OseWUr5IrzyKsJAlur4kNkdOr0= buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4 h1:TtrLf/K+yWLXk9ello83xaPD/WIZr0usxzAQdkHLyLo= buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4 h1:Z8IagBs/ox6bgyJCM21bqOxEnopxS16onmP1HLmfo1g= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 h1:KoSPqmHyi3x27tPFLQ994CJjG4qc59v+0gbxY9+VXso= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1/go.mod h1:68WGv4z/jXuTS3G7FEFQTEw4wiMmulBSX6BlSFX2Xc8= buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 h1:9ioWUVmnURL+TX4qVjyzzE3o9Z2ACDGidqtu9dkjmdY= diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 3ed80ff0a..00da7f5a9 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -34,32 +34,37 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } - client := syncv1grpc.NewFlagServiceClient(dial) - - stream, err := client.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key}) - if err != nil { - g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) - return err - } + return g.streamListener(ctx, dial, dataSync) +} +// streamListener performs the grpc listening on provided client connection and push updates through dataSync channel +func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSync chan<- sync.DataSync) error { group, localContext := errgroup.WithContext(ctx) group.Go(func() error { - return g.streamHandler(stream, dataSync) + serviceClient := syncv1grpc.NewFlagServiceClient(dial) + + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key}) + if err != nil { + g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) + return err + } + + return g.handleFlagSync(syncClient, dataSync) }) <-localContext.Done() - err = group.Wait() + err := group.Wait() if err == io.EOF { - g.Logger.Info("Stream closed by the server. Exiting without retry attempts.") + g.Logger.Info("Stream closed by the server") return err } return err } -func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { +func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { for { data, err := stream.Recv() if err != nil { @@ -76,7 +81,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received full configuration payload") - continue case v1.SyncState_SYNC_STATE_ADD: dataSync <- sync.DataSync{ FlagData: data.Flags, @@ -85,7 +89,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received an add payload") - continue case v1.SyncState_SYNC_STATE_UPDATE: dataSync <- sync.DataSync{ FlagData: data.Flags, @@ -94,7 +97,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received an update payload") - continue case v1.SyncState_SYNC_STATE_DELETE: dataSync <- sync.DataSync{ FlagData: data.Flags, @@ -103,7 +105,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received a delete payload") - continue case v1.SyncState_SYNC_STATE_PING: g.Logger.Debug("received server ping") default: diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index b51125c8e..4d80921ac 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -1,6 +1,23 @@ package grpc -import "testing" +import ( + "context" + "fmt" + "io" + "log" + "net" + "testing" + + // todo - from schema push + "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" + + "github.com/open-feature/flagd/pkg/logger" + "github.com/open-feature/flagd/pkg/sync" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) func TestUrlToGRPCTarget(t *testing.T) { tests := []struct { @@ -32,3 +49,277 @@ func TestUrlToGRPCTarget(t *testing.T) { }) } } + +func TestSync_BasicFlagSyncStates(t *testing.T) { + grpcSyncImpl := Sync{ + Target: "grpc://test", + Key: "", + Logger: logger.NewLogger(nil, false), + } + + tests := []struct { + name string + stream syncv1grpc.FlagService_SyncFlagsClient + want sync.Type + }{ + { + name: "State All maps to Sync All", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_ALL, + }, + }, + want: sync.ALL, + }, + { + name: "State Add maps to Sync Add", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_ADD, + }, + }, + want: sync.ADD, + }, + { + name: "State Update maps to Sync Update", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_UPDATE, + }, + }, + want: sync.UPDATE, + }, + { + name: "State Delete maps to Sync Delete", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + want: sync.DELETE, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + syncChan := make(chan sync.DataSync) + + go func() { + err := grpcSyncImpl.handleFlagSync(test.stream, syncChan) + if err != nil { + t.Errorf("Error handling flag sync: %s", err.Error()) + } + }() + data := <-syncChan + + if data.Type != test.want { + t.Errorf("Reuturned data sync state = %v, wanted %v", data.Type, test.want) + } + }) + } +} + +func Test_StreamListener(t *testing.T) { + const target = "localBufCon" + + tests := []struct { + name string + input []serverPayload + output []sync.DataSync + }{ + { + name: "Single send", + input: []serverPayload{ + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.ALL, + }, + }, + }, + { + name: "Multiple send", + input: []serverPayload{ + { + flags: "{}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{}", + Type: sync.ALL, + }, + { + FlagData: "{\"flags\": {}}", + Type: sync.DELETE, + }, + }, + }, + { + name: "Pings are ignored & not written to channel", + input: []serverPayload{ + { + flags: "", + state: v1.SyncState_SYNC_STATE_PING, + }, + { + flags: "", + state: v1.SyncState_SYNC_STATE_PING, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.DELETE, + }, + }, + }, + { + name: "Unknown states are & not written to channel", + input: []serverPayload{ + { + flags: "", + state: 42, + }, + { + flags: "", + state: -1, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.ALL, + }, + }, + }, + } + + for _, test := range tests { + bufCon := bufconn.Listen(5) + + bufServer := bufferedServer{ + listener: bufCon, + mockResponses: test.input, + } + + // start server + go serve(&bufServer) + + // initialize client + dial, err := grpc.Dial(target, + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return bufCon.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("Error setting up client connection: %s", err.Error()) + } + + grpcSync := Sync{ + Target: target, + Key: "", + Logger: logger.NewLogger(nil, false), + } + + syncChan := make(chan sync.DataSync, 1) + + // listen to stream + go func() { + err := grpcSync.streamListener(context.Background(), dial, syncChan) + if err != nil { + // must ignore EOF as this is returned for stream end + if err != io.EOF { + t.Errorf("Error from stream listener: %s", err.Error()) + } + } + }() + + for _, expected := range test.output { + out := <-syncChan + + if expected.Type != out.Type { + t.Errorf("Reuturned sync type = %v, wanted %v", out.Type, expected.Type) + } + + if expected.FlagData != out.FlagData { + t.Errorf("Reuturned sync data = %v, wanted %v", out.FlagData, expected.FlagData) + } + } + + // channel must be empty + if len(syncChan) != 0 { + t.Errorf("Data sync channel must be empty after all test syncs. But received non empty: %d", len(syncChan)) + } + } +} + +// Mock implementations + +type SimpleRecvMock struct { + grpc.ClientStream + mockResponse v1.SyncFlagsResponse +} + +func (s *SimpleRecvMock) Recv() (*v1.SyncFlagsResponse, error) { + return &s.mockResponse, nil +} + +// serve serves a bufferedServer +func serve(bServer *bufferedServer) { + server := grpc.NewServer() + + syncv1grpc.RegisterFlagServiceServer(server, bServer) + + if err := server.Serve(bServer.listener); err != nil { + log.Fatalf("Server exited with error: %v", err) + } +} + +type serverPayload struct { + flags string + state v1.SyncState +} + +// bufferedServer - a mock grpc service backed by buffered connection +type bufferedServer struct { + listener *bufconn.Listener + mockResponses []serverPayload +} + +func (b *bufferedServer) SyncFlags(req *v1.SyncFlagsRequest, stream syncv1grpc.FlagService_SyncFlagsServer) error { + for _, response := range b.mockResponses { + err := stream.Send(&v1.SyncFlagsResponse{ + Flags: response.flags, + State: response.state, + }) + if err != nil { + fmt.Printf("Error with stream: %s", err.Error()) + return err + } + } + + return nil +}