From 2931626344ea24677a218f503b7a98a637ad12cd Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 7 Feb 2023 15:50:43 -0800 Subject: [PATCH] add connection retry logic Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 102 ++++++++++++++++++++++++++------ pkg/sync/grpc/grpc_sync_test.go | 16 +++-- 2 files changed, 96 insertions(+), 22 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 4f928bdb3..428b2a4a6 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -4,7 +4,11 @@ import ( "context" "fmt" "io" + "math" "strings" + "time" + + "google.golang.org/grpc/credentials/insecure" "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" @@ -13,12 +17,20 @@ import ( "github.com/open-feature/flagd/pkg/sync" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) -// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate -// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. -const Prefix = "grpc://" +const ( + // Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate + // remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. + Prefix = "grpc://" + + // Connection retry constants + // Backoff period is calculated with backOffBase ^ #retry-iteration. However, when backoffLimit is reached, fallback + // to constantBackoffDelay + backoffLimit = 3 + backOffBase = 4 + constantBackoffDelay = 60 +) type Sync struct { Target string @@ -27,29 +39,86 @@ type Sync struct { } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials())) + options := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + // initial dial and connection. Failure here must result in a startup failure + dial, err := grpc.DialContext(ctx, g.Target, options...) if err != nil { - g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error())) + g.Logger.Error(fmt.Sprintf("Error establishing grpc connection: %s", err.Error())) return err } - return g.streamListener(ctx, dial, dataSync) + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + if err != nil { + g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) + return err + } + + // initial stream listening + err = g.streamListener(ctx, syncClient, dataSync) + if err != nil { + g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) + } + + // retry connection establishment + for { + g.Logger.Warn(fmt.Sprintf("Connection re-establishment attempt in-progress for grpc target: %s", g.Target)) + + syncClient = g.connectWithRetry(ctx, options...) + err = g.streamListener(ctx, syncClient, dataSync) + if err != nil { + g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) + continue + } + } } -// streamListener performs the grpc listening on provided client connection and push updates through dataSync channel -func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSync chan<- sync.DataSync) error { - group, localContext := errgroup.WithContext(ctx) +// connectWithRetry is a helper to perform exponential backoff till provided configurations and then retry connection +// periodically till a successful connection is established +func (g *Sync) connectWithRetry( + ctx context.Context, options ...grpc.DialOption, +) syncv1grpc.FlagSyncService_SyncFlagsClient { + var iteration int - group.Go(func() error { - serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + for { + var sleep time.Duration + if iteration >= backoffLimit { + sleep = constantBackoffDelay + } else { + iteration++ + sleep = time.Duration(math.Pow(backOffBase, float64(iteration))) + } + + time.Sleep(sleep * time.Second) + dial, err := grpc.DialContext(ctx, g.Target, options...) + if err != nil { + g.Logger.Debug(fmt.Sprintf("Error dialing target: %s", err.Error())) + continue + } + + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { - g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) - return err + g.Logger.Debug(fmt.Sprintf("Error openning service client: %s", err.Error())) + continue } - return g.handleFlagSync(syncClient, dataSync) + g.Logger.Info(fmt.Sprintf("Connection re-established with grpc target: %s", g.Target)) + return syncClient + } +} + +// streamListener wraps the grpc listening on provided stream and push updates through dataSync channel +func (g *Sync) streamListener( + ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync, +) error { + group, localContext := errgroup.WithContext(ctx) + group.Go(func() error { + return g.handleFlagSync(stream, dataSync) }) <-localContext.Done() @@ -67,7 +136,6 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, for { data, err := stream.Recv() if err != nil { - g.Logger.Warn(fmt.Sprintf("Error with stream response: %s", err.Error())) return err } @@ -107,7 +175,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, case v1.SyncState_SYNC_STATE_PING: g.Logger.Debug("received server ping") default: - g.Logger.Warn(fmt.Sprintf("receivied unknown state: %s", data.State.String())) + g.Logger.Debug(fmt.Sprintf("receivied unknown state: %s", data.State.String())) } } } diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 479f16f8a..830164745 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -227,6 +227,12 @@ func Test_StreamListener(t *testing.T) { // start server go serve(&bufServer) + grpcSync := Sync{ + Target: target, + ProviderID: "", + Logger: logger.NewLogger(nil, false), + } + // initialize client dial, err := grpc.Dial(target, grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { @@ -237,17 +243,17 @@ func Test_StreamListener(t *testing.T) { t.Errorf("Error setting up client connection: %s", err.Error()) } - grpcSync := Sync{ - Target: target, - ProviderID: "", - Logger: logger.NewLogger(nil, false), + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: grpcSync.ProviderID}) + if err != nil { + t.Errorf("Error opening client stream: %s", err.Error()) } syncChan := make(chan sync.DataSync, 1) // listen to stream go func() { - err := grpcSync.streamListener(context.Background(), dial, syncChan) + err := grpcSync.streamListener(context.Background(), syncClient, syncChan) if err != nil { // must ignore EOF as this is returned for stream end if err != io.EOF {