Skip to content

Commit

Permalink
refactor and testing grpc contract
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan committed Feb 6, 2023
1 parent 26b6e62 commit 765f431
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 18 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/open-feature/flagd
go 1.19

require (
buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4
buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4
buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4
buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4
buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1
buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4
buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.2.0-20220906183531-bc28b
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM=
buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4 h1:v19nAzB4Q4cSQq7PhmgeUPHQJYouc68caoBI/HdGw+k=
buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4/go.mod h1:GZ8j43uPGP/wa1L/z3q4hr5ur6hyG+WkXTtVEZqFBzs=
buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4 h1:0pM53BFVy3Xsy9IkD3pQjtIIZ6s8gvipBmfZbB2LjBM=
buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4/go.mod h1:1Di60p+uyAMgPXbZ9OseWUr5IrzyKsJAlur4kNkdOr0=
buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4 h1:TtrLf/K+yWLXk9ello83xaPD/WIZr0usxzAQdkHLyLo=
buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U=
buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4 h1:Z8IagBs/ox6bgyJCM21bqOxEnopxS16onmP1HLmfo1g=
buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U=
buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 h1:KoSPqmHyi3x27tPFLQ994CJjG4qc59v+0gbxY9+VXso=
buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1/go.mod h1:68WGv4z/jXuTS3G7FEFQTEw4wiMmulBSX6BlSFX2Xc8=
buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 h1:9ioWUVmnURL+TX4qVjyzzE3o9Z2ACDGidqtu9dkjmdY=
Expand Down
31 changes: 16 additions & 15 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,37 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return err
}

client := syncv1grpc.NewFlagServiceClient(dial)

stream, err := client.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key})
if err != nil {
g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error()))
return err
}
return g.streamListener(ctx, dial, dataSync)
}

// streamListener performs the grpc listening on provided client connection and push updates through dataSync channel
func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSync chan<- sync.DataSync) error {
group, localContext := errgroup.WithContext(ctx)

group.Go(func() error {
return g.streamHandler(stream, dataSync)
serviceClient := syncv1grpc.NewFlagServiceClient(dial)

syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key})
if err != nil {
g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error()))
return err
}

return g.handleFlagSync(syncClient, dataSync)
})

<-localContext.Done()

err = group.Wait()
err := group.Wait()
if err == io.EOF {
g.Logger.Info("Stream closed by the server. Exiting without retry attempts.")
g.Logger.Info("Stream closed by the server")
return err
}

return err
}

func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error {
func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error {
for {
data, err := stream.Recv()
if err != nil {
Expand All @@ -76,7 +81,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data
}

g.Logger.Debug("received full configuration payload")
continue
case v1.SyncState_SYNC_STATE_ADD:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Expand All @@ -85,7 +89,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data
}

g.Logger.Debug("received an add payload")
continue
case v1.SyncState_SYNC_STATE_UPDATE:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Expand All @@ -94,7 +97,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data
}

g.Logger.Debug("received an update payload")
continue
case v1.SyncState_SYNC_STATE_DELETE:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Expand All @@ -103,7 +105,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data
}

g.Logger.Debug("received a delete payload")
continue
case v1.SyncState_SYNC_STATE_PING:
g.Logger.Debug("received server ping")
default:
Expand Down
Loading

0 comments on commit 765f431

Please sign in to comment.