From 9e10b8483211bfeed42692955dec266c6b8fab93 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 24 Jan 2023 13:23:24 -0800 Subject: [PATCH] wire startup to grpc Signed-off-by: Kavindu Dodanduwa --- cmd/start.go | 2 +- docs/configuration/flagd_start.md | 2 +- pkg/runtime/from_config.go | 17 ++++++++++++++--- pkg/runtime/runtime.go | 14 +------------- pkg/sync/grpc/grpc_sync.go | 31 +++++++++++++++++++++++-------- pkg/sync/grpc/grpc_sync_test.go | 25 ++++++++++++++++++------- 6 files changed, 58 insertions(+), 33 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 94b830196..74d35cc5f 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -46,7 +46,7 @@ func init() { "a", nil, "Sync provider arguments as key values separated by =") flags.StringSliceP( uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+ - "url or FeatureFlagConfiguration. Using multiple providers is supported however if"+ + "url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if"+ " flag keys are duplicated across multiple sources it may lead to unexpected behavior. "+ "Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.", ) diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 1c14f76d2..0cc8945f3 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -21,7 +21,7 @@ flagd start [flags] -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote -a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default []) - -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. + -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` ### Options inherited from parent commands diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 8684fe6b9..9261ba0bd 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -6,13 +6,13 @@ import ( "regexp" "time" - "github.com/open-feature/flagd/pkg/sync/file" - httpSync "github.com/open-feature/flagd/pkg/sync/http" - "github.com/open-feature/flagd/pkg/eval" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/service" "github.com/open-feature/flagd/pkg/sync" + "github.com/open-feature/flagd/pkg/sync/file" + "github.com/open-feature/flagd/pkg/sync/grpc" + httpSync "github.com/open-feature/flagd/pkg/sync/http" "github.com/open-feature/flagd/pkg/sync/kubernetes" "github.com/robfig/cron" "go.uber.org/zap" @@ -21,12 +21,14 @@ import ( var ( regCrd *regexp.Regexp regURL *regexp.Regexp + regGRPC *regexp.Regexp regFile *regexp.Regexp ) func init() { regCrd = regexp.MustCompile("^core.openfeature.dev/") regURL = regexp.MustCompile("^https?://") + regGRPC = regexp.MustCompile("^" + grpc.Prefix) regFile = regexp.MustCompile("^file:") } @@ -99,6 +101,15 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { Cron: cron.New(), }) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) + case regGRPC.Match(uriB): + + r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ + Target: grpc.URLToGRPCTarget(uri), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + }) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', or 'core.openfeature.dev'", uri) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 7b89d56b4..e3fe624a1 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -8,9 +8,6 @@ import ( msync "sync" "syscall" - "github.com/open-feature/flagd/pkg/sync/grpc" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/open-feature/flagd/pkg/eval" @@ -38,6 +35,7 @@ type Config struct { ProviderArgs sync.ProviderArgs SyncURI []string + RemoteSyncType string SyncBearerToken string CORS []string @@ -72,16 +70,6 @@ func (r *Runtime) Start() error { } }) - // todo - get this from configurations - r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ - URI: "localhost:8090", - Key: "local", - Logger: r.Logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "grpc"), - ), - }) - // Start sync providers for _, s := range r.SyncImpl { p := s diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index a3c6a7a3b..1f6dcd3bf 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "strings" "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc" v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" @@ -15,15 +16,18 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate +// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. +const Prefix = "grpc://" + type Sync struct { - URI string + Target string Key string Logger *logger.Logger } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - // todo - Add certificates and/or tokens - dial, err := grpc.Dial("localhost:8090", grpc.WithTransportCredentials(insecure.NewCredentials())) + dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error())) return err @@ -47,7 +51,6 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { err = group.Wait() if err == io.EOF { - // todo - we can retry connection if this happens g.Logger.Info("Stream closed by the server. Exiting without retry attempts.") return err } @@ -67,7 +70,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_ALL: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.ALL, } @@ -76,7 +79,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_ADD: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.ADD, } @@ -85,7 +88,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_UPDATE: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.UPDATE, } @@ -94,7 +97,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_DELETE: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.DELETE, } @@ -107,3 +110,15 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d } } } + +// URLToGRPCTarget is a helper to derive GRPC target from a provided URL +// For example, function returns the target localhost:9090 for the input grpc://localhost:9090 +func URLToGRPCTarget(url string) string { + index := strings.Split(url, Prefix) + + if len(index) == 2 { + return index[1] + } + + return index[0] +} diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 678d60d72..b51125c8e 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -3,20 +3,31 @@ package grpc import "testing" func TestUrlToGRPCTarget(t *testing.T) { - type args struct { - url string - } tests := []struct { name string - args args + url string want string }{ - // TODO: Add test cases. + { + name: "With Prefix", + url: "grpc://test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Without Prefix", + url: "test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Empty is empty", + url: "", + want: "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := UrlToGRPCTarget(tt.args.url); got != tt.want { - t.Errorf("UrlToGRPCTarget() = %v, want %v", got, tt.want) + if got := URLToGRPCTarget(tt.url); got != tt.want { + t.Errorf("URLToGRPCTarget() = %v, want %v", got, tt.want) } }) }