diff --git a/cmd/start.go b/cmd/start.go index 53b587d0e..c8b0ff046 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -45,7 +45,7 @@ func init() { "a", nil, "Sync provider arguments as key values separated by =") flags.StringSliceP( uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+ - "url or FeatureFlagConfiguration. Using multiple providers is supported however if"+ + "url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if"+ " flag keys are duplicated across multiple sources it may lead to unexpected behavior. "+ "Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.", ) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 378a6db6d..8a7158eaa 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -6,13 +6,13 @@ import ( "regexp" "time" - "github.com/open-feature/flagd/pkg/sync/file" - httpSync "github.com/open-feature/flagd/pkg/sync/http" - "github.com/open-feature/flagd/pkg/eval" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/service" "github.com/open-feature/flagd/pkg/sync" + "github.com/open-feature/flagd/pkg/sync/file" + "github.com/open-feature/flagd/pkg/sync/grpc" + httpSync "github.com/open-feature/flagd/pkg/sync/http" "github.com/open-feature/flagd/pkg/sync/kubernetes" "github.com/robfig/cron" "go.uber.org/zap" @@ -21,12 +21,14 @@ import ( var ( regCrd *regexp.Regexp regURL *regexp.Regexp + regGRPC *regexp.Regexp regFile *regexp.Regexp ) func init() { regCrd = regexp.MustCompile("^core.openfeature.dev/") regURL = regexp.MustCompile("^https?://") + regGRPC = regexp.MustCompile("^" + grpc.Prefix) regFile = regexp.MustCompile("^file:") } @@ -99,6 +101,15 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { Cron: cron.New(), }) rtLogger.Debug(fmt.Sprintf("Using remote sync-provider for %q", uri)) + case regGRPC.Match(uriB): + + r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ + Target: grpc.URLToGRPCTarget(uri), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + }) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', or 'core.openfeature.dev'", uri) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 49c2b6d25..17c3f6ca5 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -8,9 +8,6 @@ import ( msync "sync" "syscall" - "github.com/open-feature/flagd/pkg/sync/grpc" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/open-feature/flagd/pkg/eval" @@ -38,6 +35,7 @@ type Config struct { ProviderArgs sync.ProviderArgs SyncURI []string + RemoteSyncType string SyncBearerToken string CORS []string @@ -72,16 +70,6 @@ func (r *Runtime) Start() error { } }) - // todo - get this from configurations - r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ - URI: "localhost:8090", - Key: "local", - Logger: r.Logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "grpc"), - ), - }) - // Start sync providers for _, s := range r.SyncImpl { p := s diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index a3c6a7a3b..1f6dcd3bf 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "strings" "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc" v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" @@ -15,15 +16,18 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate +// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. +const Prefix = "grpc://" + type Sync struct { - URI string + Target string Key string Logger *logger.Logger } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - // todo - Add certificates and/or tokens - dial, err := grpc.Dial("localhost:8090", grpc.WithTransportCredentials(insecure.NewCredentials())) + dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error())) return err @@ -47,7 +51,6 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { err = group.Wait() if err == io.EOF { - // todo - we can retry connection if this happens g.Logger.Info("Stream closed by the server. Exiting without retry attempts.") return err } @@ -67,7 +70,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_ALL: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.ALL, } @@ -76,7 +79,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_ADD: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.ADD, } @@ -85,7 +88,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_UPDATE: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.UPDATE, } @@ -94,7 +97,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_DELETE: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.DELETE, } @@ -107,3 +110,15 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d } } } + +// URLToGRPCTarget is a helper to derive GRPC target from a provided URL +// For example, function returns the target localhost:9090 for the input grpc://localhost:9090 +func URLToGRPCTarget(url string) string { + index := strings.Split(url, Prefix) + + if len(index) == 2 { + return index[1] + } + + return index[0] +} diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 678d60d72..b51125c8e 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -3,20 +3,31 @@ package grpc import "testing" func TestUrlToGRPCTarget(t *testing.T) { - type args struct { - url string - } tests := []struct { name string - args args + url string want string }{ - // TODO: Add test cases. + { + name: "With Prefix", + url: "grpc://test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Without Prefix", + url: "test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Empty is empty", + url: "", + want: "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := UrlToGRPCTarget(tt.args.url); got != tt.want { - t.Errorf("UrlToGRPCTarget() = %v, want %v", got, tt.want) + if got := URLToGRPCTarget(tt.url); got != tt.want { + t.Errorf("URLToGRPCTarget() = %v, want %v", got, tt.want) } }) }