From 7fd3473af6d3275ded91b71aa8cb5d4810a08736 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 13 Jan 2023 13:53:42 -0800 Subject: [PATCH 01/25] structure sync impls in dedicated packages Signed-off-by: Kavindu Dodanduwa --- pkg/runtime/from_config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 8684fe6b9..c9da0058d 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -2,6 +2,8 @@ package runtime import ( "fmt" + "github.com/open-feature/flagd/pkg/sync/file" + httpSync "github.com/open-feature/flagd/pkg/sync/http" "net/http" "regexp" "time" From 4c4390cffa2dea54469979ac590ceb01eb1e69df Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 13 Jan 2023 16:04:50 -0800 Subject: [PATCH 02/25] lint fix Signed-off-by: Kavindu Dodanduwa --- pkg/runtime/from_config.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index c9da0058d..8684fe6b9 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -2,8 +2,6 @@ package runtime import ( "fmt" - "github.com/open-feature/flagd/pkg/sync/file" - httpSync "github.com/open-feature/flagd/pkg/sync/http" "net/http" "regexp" "time" From 38ecc35f9661d8936732f162c311b6fd5dfba31c Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 19 Jan 2023 11:35:32 -0800 Subject: [PATCH 03/25] Simple implementation Signed-off-by: Kavindu Dodanduwa --- go.mod | 2 + go.sum | 8 ++++ pkg/runtime/runtime.go | 12 ++++++ pkg/sync/grpc/grpc_sync.go | 79 ++++++++++++++++++++++++++++++++++++++ schema/buf.gen.yaml | 8 ++++ schema/buf.yaml | 9 +++++ schema/sync/v1/flags.proto | 54 ++++++++++++++++++++++++++ 7 files changed, 172 insertions(+) create mode 100644 pkg/sync/grpc/grpc_sync.go create mode 100644 schema/buf.gen.yaml create mode 100644 schema/buf.yaml create mode 100644 schema/sync/v1/flags.proto diff --git a/go.mod b/go.mod index 20c7b458a..e68e7053e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/open-feature/flagd go 1.19 require ( + buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119185434-46a532929237.4 + buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119185434-46a532929237.4 buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 diff --git a/go.sum b/go.sum index 182dd0936..945e14ed5 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,13 @@ buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.2.0-20220906183531-bc28b723cd77.4/go.mod h1:hAKk3I2AivrJgMLXjDGrfzRx2NVWQgEPNfr4Co9DLX4= buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119181610-da48a2cc4899.4 h1:4+OZYN1CckvE3FUhICkwKce4CqNrqSF+3lj/qg7jBcA= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119181610-da48a2cc4899.4/go.mod h1:VqApiuUWBR0OgYXV2WANGNl1cziMLk/IFuJtcEcBNzQ= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119185434-46a532929237.4 h1:IQNSC6162WjVcAaanl6ltVbuWEKO81Q+Rb0RvKbCJK0= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119185434-46a532929237.4/go.mod h1:7KypYDNBNnwCC/OHyBLpFRnURona7bfsFlOSnj6YJso= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119181610-da48a2cc4899.4 h1:KoZHvPNbsieBV2wJHv1sI6Hld9Cnx2u0KkY0cfVWj0k= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119181610-da48a2cc4899.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119185434-46a532929237.4 h1:jO34xg8ujvGDu80W5I+U/U1jQHx4LTdcpW1OTPbdJcw= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119185434-46a532929237.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 h1:KoSPqmHyi3x27tPFLQ994CJjG4qc59v+0gbxY9+VXso= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1/go.mod h1:68WGv4z/jXuTS3G7FEFQTEw4wiMmulBSX6BlSFX2Xc8= buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 h1:9ioWUVmnURL+TX4qVjyzzE3o9Z2ACDGidqtu9dkjmdY= diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 3c441a0ee..b1bbbc6c0 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -3,6 +3,8 @@ package runtime import ( "context" "errors" + "github.com/open-feature/flagd/pkg/sync/grpc" + "go.uber.org/zap" "os" "os/signal" msync "sync" @@ -69,6 +71,16 @@ func (r *Runtime) Start() error { } }) + // todo - get from configurations + r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ + URI: "localhost:8090", + Key: "local", + Logger: r.Logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + }) + // Start sync providers for _, s := range r.SyncImpl { p := s diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go new file mode 100644 index 000000000..8d90412e2 --- /dev/null +++ b/pkg/sync/grpc/grpc_sync.go @@ -0,0 +1,79 @@ +package grpc + +import ( + "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc" + v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" + "context" + "fmt" + "github.com/open-feature/flagd/pkg/logger" + "github.com/open-feature/flagd/pkg/sync" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "io" +) + +type Sync struct { + URI string + Key string + Logger *logger.Logger +} + +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + + // todo - Add certificates and/or tokens + dial, err := grpc.Dial("localhost:8090", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error())) + return err + } + + client := servicev1grpc.NewFlagServiceClient(dial) + + stream, err := client.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ + // todo - key from configurations + Key: "test", + }) + + if err != nil { + g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) + return err + } + + for { + data, err := stream.Recv() + + if err == io.EOF { + g.Logger.Error("Server streaming ended") + // todo - attempt reconnection rather than returning error + return err + } + + if err != nil { + g.Logger.Warn(fmt.Sprintf("Error with stream response: %s. Continuing receiver", err.Error())) + continue + } + + switch data.State { + case v1.SyncState_SYNC_STATE_ALL: + // todo - feed data sync to store + continue + case v1.SyncState_SYNC_STATE_ADD: + // todo - feed data sync to store + continue + case v1.SyncState_SYNC_STATE_UPDATE: + // todo - feed data sync to store + continue + case v1.SyncState_SYNC_STATE_DELETE: + // todo - feed data sync to store + continue + case v1.SyncState_SYNC_STATE_CLEAN: + // todo - feed data sync to store + continue + case v1.SyncState_SYNC_STATE_PING: + g.Logger.Debug("Received server ping") + case v1.SyncState_SYNC_STATE_UNSPECIFIED: + default: + g.Logger.Debug(fmt.Sprintf("Receivied unknown state: %d", data.State)) + } + } +} diff --git a/schema/buf.gen.yaml b/schema/buf.gen.yaml new file mode 100644 index 000000000..1c73de6af --- /dev/null +++ b/schema/buf.gen.yaml @@ -0,0 +1,8 @@ +version: v1 +plugins: + - plugin: go + out: gen/proto/go + opt: paths=source_relative + - plugin: go-grpc + out: gen/proto/go + opt: paths=source_relative \ No newline at end of file diff --git a/schema/buf.yaml b/schema/buf.yaml new file mode 100644 index 000000000..7050cc463 --- /dev/null +++ b/schema/buf.yaml @@ -0,0 +1,9 @@ +version: v1 +# todo - packaging to flagd +name: buf.build/kavindudodan/flagd +breaking: + use: + - FILE +lint: + use: + - DEFAULT \ No newline at end of file diff --git a/schema/sync/v1/flags.proto b/schema/sync/v1/flags.proto new file mode 100644 index 000000000..7f60cc321 --- /dev/null +++ b/schema/sync/v1/flags.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +package sync.service.v1; + +// todo - packaging to flagd +option go_package = "sync/service/v1"; + + +/* +SyncFlagsRequest convey a key that identifies the flagd instance, enabling Sync provider to filter +specific flags for a specific flagd deployment. +*/ +message SyncFlagsRequest { + string key = 1; +} + +//SyncState convey the state of the payload +enum SyncState{ + // Value is ignored by the listening flagd + SYNC_STATE_UNSPECIFIED = 0; + + // All the flags matching the request. This is the default response and other states can be ignored + // by the implementation. Flagd internally replace all existing flags for this response state. + SYNC_STATE_ALL = 1; + + // Convey an addition of a flag. Flagd internally handles this by combining new flags with existing ones + SYNC_STATE_ADD = 2; + + // Convey an update of a flag. Flagd internally try to update if the updated flag already exist OR if it is not, + // then it will get added + SYNC_STATE_UPDATE = 3; + + // Convey a deletion of a flag. Flagd internally removes the flag + SYNC_STATE_DELETE = 4; + + // Convey a clean of all flags. Flagd internally removes all flags for this sync provider + SYNC_STATE_CLEAN = 5; + + // Optional server ping to check client connectivity. Handling is ignored by flagd and is to merely support live check + SYNC_STATE_PING = 6; +} + +// SyncFlagsResponse is the response contains flags and state +message SyncFlagsResponse { + // flagd feature flag configuration + string flags = 1; + // State conveying the operation to be performed by flagd. See descriptions of state for an explanation of values + SyncState state = 2; +} + +// FlagService implements a server streaming to provide realtime flag configurations +service FlagService { + rpc SyncFlags(SyncFlagsRequest) returns (stream SyncFlagsResponse) {}; +} \ No newline at end of file From 4aa5396806084bc488120813247eec9653d16fae Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 19 Jan 2023 14:12:00 -0800 Subject: [PATCH 04/25] POC of grpc Signed-off-by: Kavindu Dodanduwa --- pkg/runtime/runtime.go | 7 ++-- pkg/sync/grpc/grpc_sync.go | 68 +++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index b1bbbc6c0..5ca157eb5 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -3,13 +3,14 @@ package runtime import ( "context" "errors" - "github.com/open-feature/flagd/pkg/sync/grpc" - "go.uber.org/zap" "os" "os/signal" msync "sync" "syscall" + "github.com/open-feature/flagd/pkg/sync/grpc" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/open-feature/flagd/pkg/eval" @@ -71,7 +72,7 @@ func (r *Runtime) Start() error { } }) - // todo - get from configurations + // todo - get this from configurations r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ URI: "localhost:8090", Key: "local", diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 8d90412e2..64bde8267 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -1,15 +1,18 @@ package grpc import ( - "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc" - v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" "context" "fmt" + "io" + + "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc" + v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" + "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/sync" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "io" ) type Sync struct { @@ -19,7 +22,6 @@ type Sync struct { } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - // todo - Add certificates and/or tokens dial, err := grpc.Dial("localhost:8090", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -29,51 +31,49 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { client := servicev1grpc.NewFlagServiceClient(dial) - stream, err := client.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ - // todo - key from configurations - Key: "test", - }) - + stream, err := client.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key}) if err != nil { g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) return err } - for { - data, err := stream.Recv() + group, localContext := errgroup.WithContext(ctx) - if err == io.EOF { - g.Logger.Error("Server streaming ended") - // todo - attempt reconnection rather than returning error - return err - } + group.Go(func() error { + return g.streamHandler(stream, dataSync) + }) + + <-localContext.Done() + + err = group.Wait() + if err == io.EOF { + // todo - we can retry connection if this happens + g.Logger.Info("Stream closed by the server. Exiting without retry attempts.") + return err + } + + return err +} +func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { + for { + data, err := stream.Recv() if err != nil { - g.Logger.Warn(fmt.Sprintf("Error with stream response: %s. Continuing receiver", err.Error())) - continue + g.Logger.Warn(fmt.Sprintf("Error with stream response: %s", err.Error())) + return err } switch data.State { case v1.SyncState_SYNC_STATE_ALL: - // todo - feed data sync to store - continue - case v1.SyncState_SYNC_STATE_ADD: - // todo - feed data sync to store - continue - case v1.SyncState_SYNC_STATE_UPDATE: - // todo - feed data sync to store - continue - case v1.SyncState_SYNC_STATE_DELETE: - // todo - feed data sync to store - continue - case v1.SyncState_SYNC_STATE_CLEAN: - // todo - feed data sync to store + dataSync <- sync.DataSync{ + FlagData: data.Flags, + Source: g.URI, + } continue case v1.SyncState_SYNC_STATE_PING: - g.Logger.Debug("Received server ping") - case v1.SyncState_SYNC_STATE_UNSPECIFIED: + g.Logger.Info("Received server ping") default: - g.Logger.Debug(fmt.Sprintf("Receivied unknown state: %d", data.State)) + g.Logger.Info(fmt.Sprintf("Receivied unknown state: %s", data.State.String())) } } } From 827607994bc9dd59279bef0ca17e5ebdd60208a4 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Mon, 23 Jan 2023 15:43:59 -0800 Subject: [PATCH 05/25] bind grpc sync states to flagd sync states Signed-off-by: Kavindu Dodanduwa --- go.mod | 4 ++-- go.sum | 12 ++++-------- pkg/sync/grpc/grpc_sync.go | 34 ++++++++++++++++++++++++++++++++-- schema/sync/v1/flags.proto | 5 +---- 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index e68e7053e..6df6530d5 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/open-feature/flagd go 1.19 require ( - buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119185434-46a532929237.4 - buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119185434-46a532929237.4 + buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230123231905-11466466f72d.4 + buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230123231905-11466466f72d.4 buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 diff --git a/go.sum b/go.sum index 945e14ed5..71e920a99 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,9 @@ buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.2.0-20220906183531-bc28b723cd77.4/go.mod h1:hAKk3I2AivrJgMLXjDGrfzRx2NVWQgEPNfr4Co9DLX4= buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM= -buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119181610-da48a2cc4899.4 h1:4+OZYN1CckvE3FUhICkwKce4CqNrqSF+3lj/qg7jBcA= -buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119181610-da48a2cc4899.4/go.mod h1:VqApiuUWBR0OgYXV2WANGNl1cziMLk/IFuJtcEcBNzQ= -buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119185434-46a532929237.4 h1:IQNSC6162WjVcAaanl6ltVbuWEKO81Q+Rb0RvKbCJK0= -buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230119185434-46a532929237.4/go.mod h1:7KypYDNBNnwCC/OHyBLpFRnURona7bfsFlOSnj6YJso= -buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119181610-da48a2cc4899.4 h1:KoZHvPNbsieBV2wJHv1sI6Hld9Cnx2u0KkY0cfVWj0k= -buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119181610-da48a2cc4899.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= -buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119185434-46a532929237.4 h1:jO34xg8ujvGDu80W5I+U/U1jQHx4LTdcpW1OTPbdJcw= -buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230119185434-46a532929237.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230123231905-11466466f72d.4 h1:1fQv2ozb/dr74S3CPIGZunSndVzHAPeY5niWnEr0xDg= +buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230123231905-11466466f72d.4/go.mod h1:HhDGIP35zh2M82Dx4oIzit5ZEtoOaOyy35dAr/e+Uuo= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230123231905-11466466f72d.4 h1:uZNmxfZMZVrfd31Iz4YQsRa6zl4kifKUwRWjtIMz8yI= +buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230123231905-11466466f72d.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 h1:KoSPqmHyi3x27tPFLQ994CJjG4qc59v+0gbxY9+VXso= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1/go.mod h1:68WGv4z/jXuTS3G7FEFQTEw4wiMmulBSX6BlSFX2Xc8= buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 h1:9ioWUVmnURL+TX4qVjyzzE3o9Z2ACDGidqtu9dkjmdY= diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 64bde8267..a3c6a7a3b 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -68,12 +68,42 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d dataSync <- sync.DataSync{ FlagData: data.Flags, Source: g.URI, + Type: sync.ALL, } + + g.Logger.Debug("received full configuration payload") + continue + case v1.SyncState_SYNC_STATE_ADD: + dataSync <- sync.DataSync{ + FlagData: data.Flags, + Source: g.URI, + Type: sync.ADD, + } + + g.Logger.Debug("received an add payload") + continue + case v1.SyncState_SYNC_STATE_UPDATE: + dataSync <- sync.DataSync{ + FlagData: data.Flags, + Source: g.URI, + Type: sync.UPDATE, + } + + g.Logger.Debug("received an update payload") + continue + case v1.SyncState_SYNC_STATE_DELETE: + dataSync <- sync.DataSync{ + FlagData: data.Flags, + Source: g.URI, + Type: sync.DELETE, + } + + g.Logger.Debug("received a delete payload") continue case v1.SyncState_SYNC_STATE_PING: - g.Logger.Info("Received server ping") + g.Logger.Debug("received server ping") default: - g.Logger.Info(fmt.Sprintf("Receivied unknown state: %s", data.State.String())) + g.Logger.Warn(fmt.Sprintf("receivied unknown state: %s", data.State.String())) } } } diff --git a/schema/sync/v1/flags.proto b/schema/sync/v1/flags.proto index 7f60cc321..da64cc735 100644 --- a/schema/sync/v1/flags.proto +++ b/schema/sync/v1/flags.proto @@ -33,11 +33,8 @@ enum SyncState{ // Convey a deletion of a flag. Flagd internally removes the flag SYNC_STATE_DELETE = 4; - // Convey a clean of all flags. Flagd internally removes all flags for this sync provider - SYNC_STATE_CLEAN = 5; - // Optional server ping to check client connectivity. Handling is ignored by flagd and is to merely support live check - SYNC_STATE_PING = 6; + SYNC_STATE_PING = 5; } // SyncFlagsResponse is the response contains flags and state From 90435db9698ee8e7f6ef71e3178cd41e17f98859 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 24 Jan 2023 13:19:34 -0800 Subject: [PATCH 06/25] wire startup to grpc sync Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 pkg/sync/grpc/grpc_sync_test.go diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go new file mode 100644 index 000000000..678d60d72 --- /dev/null +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -0,0 +1,23 @@ +package grpc + +import "testing" + +func TestUrlToGRPCTarget(t *testing.T) { + type args struct { + url string + } + tests := []struct { + name string + args args + want string + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := UrlToGRPCTarget(tt.args.url); got != tt.want { + t.Errorf("UrlToGRPCTarget() = %v, want %v", got, tt.want) + } + }) + } +} From 00387ce55d63b96536dc42dc38353b905326237a Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 24 Jan 2023 13:23:24 -0800 Subject: [PATCH 07/25] wire startup to grpc Signed-off-by: Kavindu Dodanduwa --- cmd/start.go | 2 +- docs/configuration/flagd_start.md | 2 +- pkg/runtime/from_config.go | 17 ++++++++++++++--- pkg/runtime/runtime.go | 14 +------------- pkg/sync/grpc/grpc_sync.go | 31 +++++++++++++++++++++++-------- pkg/sync/grpc/grpc_sync_test.go | 25 ++++++++++++++++++------- 6 files changed, 58 insertions(+), 33 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 94b830196..74d35cc5f 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -46,7 +46,7 @@ func init() { "a", nil, "Sync provider arguments as key values separated by =") flags.StringSliceP( uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+ - "url or FeatureFlagConfiguration. Using multiple providers is supported however if"+ + "url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if"+ " flag keys are duplicated across multiple sources it may lead to unexpected behavior. "+ "Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.", ) diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 1c14f76d2..0cc8945f3 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -21,7 +21,7 @@ flagd start [flags] -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote -a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default []) - -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. + -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` ### Options inherited from parent commands diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 8684fe6b9..9261ba0bd 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -6,13 +6,13 @@ import ( "regexp" "time" - "github.com/open-feature/flagd/pkg/sync/file" - httpSync "github.com/open-feature/flagd/pkg/sync/http" - "github.com/open-feature/flagd/pkg/eval" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/service" "github.com/open-feature/flagd/pkg/sync" + "github.com/open-feature/flagd/pkg/sync/file" + "github.com/open-feature/flagd/pkg/sync/grpc" + httpSync "github.com/open-feature/flagd/pkg/sync/http" "github.com/open-feature/flagd/pkg/sync/kubernetes" "github.com/robfig/cron" "go.uber.org/zap" @@ -21,12 +21,14 @@ import ( var ( regCrd *regexp.Regexp regURL *regexp.Regexp + regGRPC *regexp.Regexp regFile *regexp.Regexp ) func init() { regCrd = regexp.MustCompile("^core.openfeature.dev/") regURL = regexp.MustCompile("^https?://") + regGRPC = regexp.MustCompile("^" + grpc.Prefix) regFile = regexp.MustCompile("^file:") } @@ -99,6 +101,15 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { Cron: cron.New(), }) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) + case regGRPC.Match(uriB): + + r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ + Target: grpc.URLToGRPCTarget(uri), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + }) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', or 'core.openfeature.dev'", uri) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 5ca157eb5..0b9ffefbb 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -8,9 +8,6 @@ import ( msync "sync" "syscall" - "github.com/open-feature/flagd/pkg/sync/grpc" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/open-feature/flagd/pkg/eval" @@ -38,6 +35,7 @@ type Config struct { ProviderArgs sync.ProviderArgs SyncURI []string + RemoteSyncType string SyncBearerToken string CORS []string @@ -72,16 +70,6 @@ func (r *Runtime) Start() error { } }) - // todo - get this from configurations - r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ - URI: "localhost:8090", - Key: "local", - Logger: r.Logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "grpc"), - ), - }) - // Start sync providers for _, s := range r.SyncImpl { p := s diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index a3c6a7a3b..1f6dcd3bf 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "strings" "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc" v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" @@ -15,15 +16,18 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate +// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. +const Prefix = "grpc://" + type Sync struct { - URI string + Target string Key string Logger *logger.Logger } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - // todo - Add certificates and/or tokens - dial, err := grpc.Dial("localhost:8090", grpc.WithTransportCredentials(insecure.NewCredentials())) + dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error())) return err @@ -47,7 +51,6 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { err = group.Wait() if err == io.EOF { - // todo - we can retry connection if this happens g.Logger.Info("Stream closed by the server. Exiting without retry attempts.") return err } @@ -67,7 +70,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_ALL: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.ALL, } @@ -76,7 +79,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_ADD: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.ADD, } @@ -85,7 +88,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_UPDATE: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.UPDATE, } @@ -94,7 +97,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d case v1.SyncState_SYNC_STATE_DELETE: dataSync <- sync.DataSync{ FlagData: data.Flags, - Source: g.URI, + Source: g.Target, Type: sync.DELETE, } @@ -107,3 +110,15 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d } } } + +// URLToGRPCTarget is a helper to derive GRPC target from a provided URL +// For example, function returns the target localhost:9090 for the input grpc://localhost:9090 +func URLToGRPCTarget(url string) string { + index := strings.Split(url, Prefix) + + if len(index) == 2 { + return index[1] + } + + return index[0] +} diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 678d60d72..b51125c8e 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -3,20 +3,31 @@ package grpc import "testing" func TestUrlToGRPCTarget(t *testing.T) { - type args struct { - url string - } tests := []struct { name string - args args + url string want string }{ - // TODO: Add test cases. + { + name: "With Prefix", + url: "grpc://test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Without Prefix", + url: "test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Empty is empty", + url: "", + want: "", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := UrlToGRPCTarget(tt.args.url); got != tt.want { - t.Errorf("UrlToGRPCTarget() = %v, want %v", got, tt.want) + if got := URLToGRPCTarget(tt.url); got != tt.want { + t.Errorf("URLToGRPCTarget() = %v, want %v", got, tt.want) } }) } From 0cbb5b385558fb209ab5cba34803843689728f6c Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Wed, 1 Feb 2023 15:47:55 -0800 Subject: [PATCH 08/25] cleanup schema from local Signed-off-by: Kavindu Dodanduwa --- go.mod | 4 +-- go.sum | 16 ++++++++++-- pkg/sync/grpc/grpc_sync.go | 7 +++--- schema/buf.gen.yaml | 8 ------ schema/buf.yaml | 9 ------- schema/sync/v1/flags.proto | 51 -------------------------------------- 6 files changed, 20 insertions(+), 75 deletions(-) delete mode 100644 schema/buf.gen.yaml delete mode 100644 schema/buf.yaml delete mode 100644 schema/sync/v1/flags.proto diff --git a/go.mod b/go.mod index 6df6530d5..3bd7f6727 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/open-feature/flagd go 1.19 require ( - buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230123231905-11466466f72d.4 - buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230123231905-11466466f72d.4 + buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4 + buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4 buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 diff --git a/go.sum b/go.sum index 71e920a99..3452a0e8b 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bufbuild/connect-go v1.5.0 h1:IfbgbzzaaZvF+OM3SfxO2EjtvNJarNAz2DIRuuNjAgc= +github.com/bufbuild/connect-go v1.5.0/go.mod h1:9iNvh/NOsfhNBUH5CtvXeVUskQO1xsrEviH7ZArwZ3I= github.com/bufbuild/connect-go v1.5.1 h1:ORhrSiu63hWxtuMmC/V1mKySSRhEySsW5RkHJcyJXBk= github.com/bufbuild/connect-go v1.5.1/go.mod h1:9iNvh/NOsfhNBUH5CtvXeVUskQO1xsrEviH7ZArwZ3I= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -285,6 +287,14 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.6.0 h1:9t9b9vRUbFq3C4qKFCGkVuq/fIHji802N1nrtkh1mNc= github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= +github.com/open-feature/open-feature-operator v0.2.24 h1:6UwfHO7pa2WDDpdyL+hzYwukbooAA2IZFgyj5xga2vw= +github.com/open-feature/open-feature-operator v0.2.24/go.mod h1:6zsu3m2sa8b4qJlHIAp1Kuc80mCAOAkBCkvDTTyv9ZY= +github.com/open-feature/open-feature-operator v0.2.25 h1:6X1dn7YTTCxRj7Sq6NR3ThDvXYt+4VPPC1GP7D5GD+Q= +github.com/open-feature/open-feature-operator v0.2.25/go.mod h1:8OFtVXXdVpZTSx1vHravbTYup4iyeb+PLmiKbRL11TA= +github.com/open-feature/open-feature-operator v0.2.26 h1:nv3Bln6Zvkc0fXz1/XpQR5TtiXn8KZ/9r85y/jWGNE0= +github.com/open-feature/open-feature-operator v0.2.26/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= +github.com/open-feature/open-feature-operator v0.2.27 h1:OIPEVrEOK39mLeImKrcLnd1AVClj7VrEMOtnZjHLXxY= +github.com/open-feature/open-feature-operator v0.2.27/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= github.com/open-feature/open-feature-operator v0.2.28 h1:qzzVq8v9G7aXO7luocO/wQCGnTJjtcQh75mDOqjnFxo= github.com/open-feature/open-feature-operator v0.2.28/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= github.com/open-feature/schemas v0.2.8 h1:oA75hJXpOd9SFgmNI2IAxWZkwzQPUDm7Jyyh3q489wM= @@ -560,8 +570,6 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= @@ -798,6 +806,10 @@ k8s.io/utils v0.0.0-20221128185143-99ec85e7a448/go.mod h1:OLgZIPagt7ERELqWJFomSt rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +sigs.k8s.io/controller-runtime v0.14.2 h1:P6IwDhbsRWsBClt/8/h8Zy36bCuGuW5Op7MHpFrN/60= +sigs.k8s.io/controller-runtime v0.14.2/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= +sigs.k8s.io/controller-runtime v0.14.3 h1:F1JutCoGfSDRiayjAaWcB8SC4BwIt6qkZ/TwiVY8ZRI= +sigs.k8s.io/controller-runtime v0.14.3/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= sigs.k8s.io/controller-runtime v0.14.4 h1:Kd/Qgx5pd2XUL08eOV2vwIq3L9GhIbJ5Nxengbd4/0M= sigs.k8s.io/controller-runtime v0.14.4/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 1f6dcd3bf..3ed80ff0a 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -6,7 +6,8 @@ import ( "io" "strings" - "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc" + // todo - from schema push + "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/syncv1grpc" v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" "github.com/open-feature/flagd/pkg/logger" @@ -33,7 +34,7 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } - client := servicev1grpc.NewFlagServiceClient(dial) + client := syncv1grpc.NewFlagServiceClient(dial) stream, err := client.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key}) if err != nil { @@ -58,7 +59,7 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } -func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { +func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { for { data, err := stream.Recv() if err != nil { diff --git a/schema/buf.gen.yaml b/schema/buf.gen.yaml deleted file mode 100644 index 1c73de6af..000000000 --- a/schema/buf.gen.yaml +++ /dev/null @@ -1,8 +0,0 @@ -version: v1 -plugins: - - plugin: go - out: gen/proto/go - opt: paths=source_relative - - plugin: go-grpc - out: gen/proto/go - opt: paths=source_relative \ No newline at end of file diff --git a/schema/buf.yaml b/schema/buf.yaml deleted file mode 100644 index 7050cc463..000000000 --- a/schema/buf.yaml +++ /dev/null @@ -1,9 +0,0 @@ -version: v1 -# todo - packaging to flagd -name: buf.build/kavindudodan/flagd -breaking: - use: - - FILE -lint: - use: - - DEFAULT \ No newline at end of file diff --git a/schema/sync/v1/flags.proto b/schema/sync/v1/flags.proto deleted file mode 100644 index da64cc735..000000000 --- a/schema/sync/v1/flags.proto +++ /dev/null @@ -1,51 +0,0 @@ -syntax = "proto3"; - -package sync.service.v1; - -// todo - packaging to flagd -option go_package = "sync/service/v1"; - - -/* -SyncFlagsRequest convey a key that identifies the flagd instance, enabling Sync provider to filter -specific flags for a specific flagd deployment. -*/ -message SyncFlagsRequest { - string key = 1; -} - -//SyncState convey the state of the payload -enum SyncState{ - // Value is ignored by the listening flagd - SYNC_STATE_UNSPECIFIED = 0; - - // All the flags matching the request. This is the default response and other states can be ignored - // by the implementation. Flagd internally replace all existing flags for this response state. - SYNC_STATE_ALL = 1; - - // Convey an addition of a flag. Flagd internally handles this by combining new flags with existing ones - SYNC_STATE_ADD = 2; - - // Convey an update of a flag. Flagd internally try to update if the updated flag already exist OR if it is not, - // then it will get added - SYNC_STATE_UPDATE = 3; - - // Convey a deletion of a flag. Flagd internally removes the flag - SYNC_STATE_DELETE = 4; - - // Optional server ping to check client connectivity. Handling is ignored by flagd and is to merely support live check - SYNC_STATE_PING = 5; -} - -// SyncFlagsResponse is the response contains flags and state -message SyncFlagsResponse { - // flagd feature flag configuration - string flags = 1; - // State conveying the operation to be performed by flagd. See descriptions of state for an explanation of values - SyncState state = 2; -} - -// FlagService implements a server streaming to provide realtime flag configurations -service FlagService { - rpc SyncFlags(SyncFlagsRequest) returns (stream SyncFlagsResponse) {}; -} \ No newline at end of file From aa2219214315dadb59c8ce949af69418d9555528 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 3 Feb 2023 11:13:30 -0800 Subject: [PATCH 09/25] refactor and testing grpc contract Signed-off-by: Kavindu Dodanduwa --- go.mod | 4 +- pkg/sync/grpc/grpc_sync.go | 31 ++-- pkg/sync/grpc/grpc_sync_test.go | 293 +++++++++++++++++++++++++++++++- 3 files changed, 310 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 3bd7f6727..5304c7b70 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/open-feature/flagd go 1.19 require ( - buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201231859-0ba584642f5c.4 - buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201231859-0ba584642f5c.4 + buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4 + buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4 buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 3ed80ff0a..00da7f5a9 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -34,32 +34,37 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } - client := syncv1grpc.NewFlagServiceClient(dial) - - stream, err := client.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key}) - if err != nil { - g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) - return err - } + return g.streamListener(ctx, dial, dataSync) +} +// streamListener performs the grpc listening on provided client connection and push updates through dataSync channel +func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSync chan<- sync.DataSync) error { group, localContext := errgroup.WithContext(ctx) group.Go(func() error { - return g.streamHandler(stream, dataSync) + serviceClient := syncv1grpc.NewFlagServiceClient(dial) + + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key}) + if err != nil { + g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) + return err + } + + return g.handleFlagSync(syncClient, dataSync) }) <-localContext.Done() - err = group.Wait() + err := group.Wait() if err == io.EOF { - g.Logger.Info("Stream closed by the server. Exiting without retry attempts.") + g.Logger.Info("Stream closed by the server") return err } return err } -func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { +func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { for { data, err := stream.Recv() if err != nil { @@ -76,7 +81,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received full configuration payload") - continue case v1.SyncState_SYNC_STATE_ADD: dataSync <- sync.DataSync{ FlagData: data.Flags, @@ -85,7 +89,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received an add payload") - continue case v1.SyncState_SYNC_STATE_UPDATE: dataSync <- sync.DataSync{ FlagData: data.Flags, @@ -94,7 +97,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received an update payload") - continue case v1.SyncState_SYNC_STATE_DELETE: dataSync <- sync.DataSync{ FlagData: data.Flags, @@ -103,7 +105,6 @@ func (g *Sync) streamHandler(stream syncv1grpc.FlagService_SyncFlagsClient, data } g.Logger.Debug("received a delete payload") - continue case v1.SyncState_SYNC_STATE_PING: g.Logger.Debug("received server ping") default: diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index b51125c8e..4d80921ac 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -1,6 +1,23 @@ package grpc -import "testing" +import ( + "context" + "fmt" + "io" + "log" + "net" + "testing" + + // todo - from schema push + "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" + + "github.com/open-feature/flagd/pkg/logger" + "github.com/open-feature/flagd/pkg/sync" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) func TestUrlToGRPCTarget(t *testing.T) { tests := []struct { @@ -32,3 +49,277 @@ func TestUrlToGRPCTarget(t *testing.T) { }) } } + +func TestSync_BasicFlagSyncStates(t *testing.T) { + grpcSyncImpl := Sync{ + Target: "grpc://test", + Key: "", + Logger: logger.NewLogger(nil, false), + } + + tests := []struct { + name string + stream syncv1grpc.FlagService_SyncFlagsClient + want sync.Type + }{ + { + name: "State All maps to Sync All", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_ALL, + }, + }, + want: sync.ALL, + }, + { + name: "State Add maps to Sync Add", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_ADD, + }, + }, + want: sync.ADD, + }, + { + name: "State Update maps to Sync Update", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_UPDATE, + }, + }, + want: sync.UPDATE, + }, + { + name: "State Delete maps to Sync Delete", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + Flags: "{}", + State: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + want: sync.DELETE, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + syncChan := make(chan sync.DataSync) + + go func() { + err := grpcSyncImpl.handleFlagSync(test.stream, syncChan) + if err != nil { + t.Errorf("Error handling flag sync: %s", err.Error()) + } + }() + data := <-syncChan + + if data.Type != test.want { + t.Errorf("Reuturned data sync state = %v, wanted %v", data.Type, test.want) + } + }) + } +} + +func Test_StreamListener(t *testing.T) { + const target = "localBufCon" + + tests := []struct { + name string + input []serverPayload + output []sync.DataSync + }{ + { + name: "Single send", + input: []serverPayload{ + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.ALL, + }, + }, + }, + { + name: "Multiple send", + input: []serverPayload{ + { + flags: "{}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{}", + Type: sync.ALL, + }, + { + FlagData: "{\"flags\": {}}", + Type: sync.DELETE, + }, + }, + }, + { + name: "Pings are ignored & not written to channel", + input: []serverPayload{ + { + flags: "", + state: v1.SyncState_SYNC_STATE_PING, + }, + { + flags: "", + state: v1.SyncState_SYNC_STATE_PING, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.DELETE, + }, + }, + }, + { + name: "Unknown states are & not written to channel", + input: []serverPayload{ + { + flags: "", + state: 42, + }, + { + flags: "", + state: -1, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.ALL, + }, + }, + }, + } + + for _, test := range tests { + bufCon := bufconn.Listen(5) + + bufServer := bufferedServer{ + listener: bufCon, + mockResponses: test.input, + } + + // start server + go serve(&bufServer) + + // initialize client + dial, err := grpc.Dial(target, + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return bufCon.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("Error setting up client connection: %s", err.Error()) + } + + grpcSync := Sync{ + Target: target, + Key: "", + Logger: logger.NewLogger(nil, false), + } + + syncChan := make(chan sync.DataSync, 1) + + // listen to stream + go func() { + err := grpcSync.streamListener(context.Background(), dial, syncChan) + if err != nil { + // must ignore EOF as this is returned for stream end + if err != io.EOF { + t.Errorf("Error from stream listener: %s", err.Error()) + } + } + }() + + for _, expected := range test.output { + out := <-syncChan + + if expected.Type != out.Type { + t.Errorf("Reuturned sync type = %v, wanted %v", out.Type, expected.Type) + } + + if expected.FlagData != out.FlagData { + t.Errorf("Reuturned sync data = %v, wanted %v", out.FlagData, expected.FlagData) + } + } + + // channel must be empty + if len(syncChan) != 0 { + t.Errorf("Data sync channel must be empty after all test syncs. But received non empty: %d", len(syncChan)) + } + } +} + +// Mock implementations + +type SimpleRecvMock struct { + grpc.ClientStream + mockResponse v1.SyncFlagsResponse +} + +func (s *SimpleRecvMock) Recv() (*v1.SyncFlagsResponse, error) { + return &s.mockResponse, nil +} + +// serve serves a bufferedServer +func serve(bServer *bufferedServer) { + server := grpc.NewServer() + + syncv1grpc.RegisterFlagServiceServer(server, bServer) + + if err := server.Serve(bServer.listener); err != nil { + log.Fatalf("Server exited with error: %v", err) + } +} + +type serverPayload struct { + flags string + state v1.SyncState +} + +// bufferedServer - a mock grpc service backed by buffered connection +type bufferedServer struct { + listener *bufconn.Listener + mockResponses []serverPayload +} + +func (b *bufferedServer) SyncFlags(req *v1.SyncFlagsRequest, stream syncv1grpc.FlagService_SyncFlagsServer) error { + for _, response := range b.mockResponses { + err := stream.Send(&v1.SyncFlagsResponse{ + Flags: response.flags, + State: response.state, + }) + if err != nil { + fmt.Printf("Error with stream: %s", err.Error()) + return err + } + } + + return nil +} From ecff902ae749509f236bb8e7374ef74fad337a84 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Mon, 6 Feb 2023 14:12:14 -0800 Subject: [PATCH 10/25] provider id config and finalize Signed-off-by: Kavindu Dodanduwa --- cmd/start.go | 50 ++++++++++++++++++--------------- go.mod | 4 +-- pkg/runtime/from_config.go | 3 +- pkg/runtime/runtime.go | 9 +++--- pkg/sync/grpc/grpc_sync.go | 20 ++++++------- pkg/sync/grpc/grpc_sync_test.go | 38 ++++++++++++------------- 6 files changed, 65 insertions(+), 59 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 74d35cc5f..4da87b855 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -13,18 +13,19 @@ import ( ) const ( - portFlagName = "port" + bearerTokenFlagName = "bearer-token" + corsFlagName = "cors-origin" + evaluatorFlagName = "evaluator" + logFormatFlagName = "log-format" metricsPortFlagName = "metrics-port" - socketPathFlagName = "socket-path" + portFlagName = "port" providerArgsFlagName = "sync-provider-args" - evaluatorFlagName = "evaluator" + providerIdentifier = "provider-id" serverCertPathFlagName = "server-cert-path" serverKeyPathFlagName = "server-key-path" - uriFlagName = "uri" - bearerTokenFlagName = "bearer-token" - corsFlagName = "cors-origin" + socketPathFlagName = "socket-path" syncProviderFlagName = "sync-provider" - logFormatFlagName = "log-format" + uriFlagName = "uri" ) func init() { @@ -57,19 +58,21 @@ func init() { syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote", ) flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ") + flags.StringP(providerIdentifier, "i", "", "Set the identifier of this flagd runtime") - _ = viper.BindPFlag(portFlagName, flags.Lookup(portFlagName)) + _ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName)) + _ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName)) + _ = viper.BindPFlag(evaluatorFlagName, flags.Lookup(evaluatorFlagName)) + _ = viper.BindPFlag(logFormatFlagName, flags.Lookup(logFormatFlagName)) _ = viper.BindPFlag(metricsPortFlagName, flags.Lookup(metricsPortFlagName)) - _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) + _ = viper.BindPFlag(portFlagName, flags.Lookup(portFlagName)) _ = viper.BindPFlag(providerArgsFlagName, flags.Lookup(providerArgsFlagName)) - _ = viper.BindPFlag(evaluatorFlagName, flags.Lookup(evaluatorFlagName)) + _ = viper.BindPFlag(providerIdentifier, flags.Lookup(providerIdentifier)) _ = viper.BindPFlag(serverCertPathFlagName, flags.Lookup(serverCertPathFlagName)) _ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName)) - _ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName)) - _ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName)) - _ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName)) + _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) _ = viper.BindPFlag(syncProviderFlagName, flags.Lookup(syncProviderFlagName)) - _ = viper.BindPFlag(logFormatFlagName, flags.Lookup(logFormatFlagName)) + _ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName)) } // startCmd represents the start command @@ -104,15 +107,16 @@ var startCmd = &cobra.Command{ } // Build Runtime ----------------------------------------------------------- rt, err := runtime.FromConfig(logger, runtime.Config{ - ServicePort: viper.GetInt32(portFlagName), - MetricsPort: viper.GetInt32(metricsPortFlagName), - ServiceSocketPath: viper.GetString(socketPathFlagName), - ServiceCertPath: viper.GetString(serverCertPathFlagName), - ServiceKeyPath: viper.GetString(serverKeyPathFlagName), - ProviderArgs: viper.GetStringMapString(providerArgsFlagName), - SyncURI: viper.GetStringSlice(uriFlagName), - SyncBearerToken: viper.GetString(bearerTokenFlagName), - CORS: viper.GetStringSlice(corsFlagName), + CORS: viper.GetStringSlice(corsFlagName), + MetricsPort: viper.GetInt32(metricsPortFlagName), + ProviderArgs: viper.GetStringMapString(providerArgsFlagName), + ProviderIdentifier: viper.GetString(providerIdentifier), + ServiceCertPath: viper.GetString(serverCertPathFlagName), + ServiceKeyPath: viper.GetString(serverKeyPathFlagName), + ServicePort: viper.GetInt32(portFlagName), + ServiceSocketPath: viper.GetString(socketPathFlagName), + SyncBearerToken: viper.GetString(bearerTokenFlagName), + SyncURI: viper.GetStringSlice(uriFlagName), }) if err != nil { rtLogger.Fatal(err.Error()) diff --git a/go.mod b/go.mod index 5304c7b70..74b147ebd 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/open-feature/flagd go 1.19 require ( - buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230201234317-013221f6d37a.4 - buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230201234317-013221f6d37a.4 + buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230206214800-4b74922b8ec9.4 + buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230206214800-4b74922b8ec9.4 buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 9261ba0bd..c2f7741c9 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -104,7 +104,8 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { case regGRPC.Match(uriB): r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ - Target: grpc.URLToGRPCTarget(uri), + Target: grpc.URLToGRPCTarget(uri), + ProviderID: r.config.ProviderIdentifier, Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "grpc"), diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 0b9ffefbb..7b558ba08 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -33,10 +33,11 @@ type Config struct { ServiceCertPath string ServiceKeyPath string - ProviderArgs sync.ProviderArgs - SyncURI []string - RemoteSyncType string - SyncBearerToken string + ProviderArgs sync.ProviderArgs + ProviderIdentifier string + SyncURI []string + RemoteSyncType string + SyncBearerToken string CORS []string } diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 00da7f5a9..22e93649f 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -22,9 +22,9 @@ import ( const Prefix = "grpc://" type Sync struct { - Target string - Key string - Logger *logger.Logger + Target string + ProviderID string + Logger *logger.Logger } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { @@ -42,9 +42,9 @@ func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSy group, localContext := errgroup.WithContext(ctx) group.Go(func() error { - serviceClient := syncv1grpc.NewFlagServiceClient(dial) + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) - syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{Key: g.Key}) + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) return err @@ -64,7 +64,7 @@ func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSy return err } -func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { +func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { for { data, err := stream.Recv() if err != nil { @@ -75,7 +75,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dat switch data.State { case v1.SyncState_SYNC_STATE_ALL: dataSync <- sync.DataSync{ - FlagData: data.Flags, + FlagData: data.FlagConfiguration, Source: g.Target, Type: sync.ALL, } @@ -83,7 +83,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dat g.Logger.Debug("received full configuration payload") case v1.SyncState_SYNC_STATE_ADD: dataSync <- sync.DataSync{ - FlagData: data.Flags, + FlagData: data.FlagConfiguration, Source: g.Target, Type: sync.ADD, } @@ -91,7 +91,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dat g.Logger.Debug("received an add payload") case v1.SyncState_SYNC_STATE_UPDATE: dataSync <- sync.DataSync{ - FlagData: data.Flags, + FlagData: data.FlagConfiguration, Source: g.Target, Type: sync.UPDATE, } @@ -99,7 +99,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagService_SyncFlagsClient, dat g.Logger.Debug("received an update payload") case v1.SyncState_SYNC_STATE_DELETE: dataSync <- sync.DataSync{ - FlagData: data.Flags, + FlagData: data.FlagConfiguration, Source: g.Target, Type: sync.DELETE, } diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 4d80921ac..649e72df8 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -52,22 +52,22 @@ func TestUrlToGRPCTarget(t *testing.T) { func TestSync_BasicFlagSyncStates(t *testing.T) { grpcSyncImpl := Sync{ - Target: "grpc://test", - Key: "", - Logger: logger.NewLogger(nil, false), + Target: "grpc://test", + ProviderID: "", + Logger: logger.NewLogger(nil, false), } tests := []struct { name string - stream syncv1grpc.FlagService_SyncFlagsClient + stream syncv1grpc.FlagSyncService_SyncFlagsClient want sync.Type }{ { name: "State All maps to Sync All", stream: &SimpleRecvMock{ mockResponse: v1.SyncFlagsResponse{ - Flags: "{}", - State: v1.SyncState_SYNC_STATE_ALL, + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_ALL, }, }, want: sync.ALL, @@ -76,8 +76,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { name: "State Add maps to Sync Add", stream: &SimpleRecvMock{ mockResponse: v1.SyncFlagsResponse{ - Flags: "{}", - State: v1.SyncState_SYNC_STATE_ADD, + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_ADD, }, }, want: sync.ADD, @@ -86,8 +86,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { name: "State Update maps to Sync Update", stream: &SimpleRecvMock{ mockResponse: v1.SyncFlagsResponse{ - Flags: "{}", - State: v1.SyncState_SYNC_STATE_UPDATE, + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_UPDATE, }, }, want: sync.UPDATE, @@ -96,8 +96,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { name: "State Delete maps to Sync Delete", stream: &SimpleRecvMock{ mockResponse: v1.SyncFlagsResponse{ - Flags: "{}", - State: v1.SyncState_SYNC_STATE_DELETE, + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_DELETE, }, }, want: sync.DELETE, @@ -239,9 +239,9 @@ func Test_StreamListener(t *testing.T) { } grpcSync := Sync{ - Target: target, - Key: "", - Logger: logger.NewLogger(nil, false), + Target: target, + ProviderID: "", + Logger: logger.NewLogger(nil, false), } syncChan := make(chan sync.DataSync, 1) @@ -291,7 +291,7 @@ func (s *SimpleRecvMock) Recv() (*v1.SyncFlagsResponse, error) { func serve(bServer *bufferedServer) { server := grpc.NewServer() - syncv1grpc.RegisterFlagServiceServer(server, bServer) + syncv1grpc.RegisterFlagSyncServiceServer(server, bServer) if err := server.Serve(bServer.listener); err != nil { log.Fatalf("Server exited with error: %v", err) @@ -309,11 +309,11 @@ type bufferedServer struct { mockResponses []serverPayload } -func (b *bufferedServer) SyncFlags(req *v1.SyncFlagsRequest, stream syncv1grpc.FlagService_SyncFlagsServer) error { +func (b *bufferedServer) SyncFlags(req *v1.SyncFlagsRequest, stream syncv1grpc.FlagSyncService_SyncFlagsServer) error { for _, response := range b.mockResponses { err := stream.Send(&v1.SyncFlagsResponse{ - Flags: response.flags, - State: response.state, + FlagConfiguration: response.flags, + State: response.state, }) if err != nil { fmt.Printf("Error with stream: %s", err.Error()) From 1ee2f6f4ec6d63128bdf0e77299c740dc2426ece Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Mon, 6 Feb 2023 14:16:07 -0800 Subject: [PATCH 11/25] add missing docs Signed-off-by: Kavindu Dodanduwa --- docs/configuration/flagd_start.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 0cc8945f3..d21783142 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -16,6 +16,7 @@ flagd start [flags] -z, --log-format string Set the logging format, e.g. console or json (default "console") -m, --metrics-port int32 Port to serve metrics on (default 8014) -p, --port int32 Port to listen on (default 8013) + -i, --provider-id string Set the identifier of this flagd runtime -c, --server-cert-path string Server side tls certificate path -k, --server-key-path string Server side tls key path -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. From 14d923700d06d634471e80b683ec4b4b6f2475ff Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 7 Feb 2023 10:56:14 -0800 Subject: [PATCH 12/25] update schema to openfeature Signed-off-by: Kavindu Dodanduwa --- go.mod | 6 ++---- pkg/sync/grpc/grpc_sync.go | 5 ++--- pkg/sync/grpc/grpc_sync_test.go | 5 ++--- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 74b147ebd..fee6844d4 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,9 @@ module github.com/open-feature/flagd go 1.19 require ( - buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230206214800-4b74922b8ec9.4 - buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230206214800-4b74922b8ec9.4 buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 - buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 - buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 + buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20230207182158-c211472558c3.4 + buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20230207182158-c211472558c3.4 github.com/bufbuild/connect-go v1.5.1 github.com/diegoholiveira/jsonlogic/v3 v3.2.7 github.com/dimiro1/banner v1.1.0 diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 22e93649f..4f928bdb3 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -6,9 +6,8 @@ import ( "io" "strings" - // todo - from schema push - "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/syncv1grpc" - v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" + "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/sync" diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 649e72df8..479f16f8a 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -8,9 +8,8 @@ import ( "net" "testing" - // todo - from schema push - "buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/syncv1grpc" - v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1" + "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/sync" From 8e052a3abfb5d4f801ffc556f3b4e968046bf1d0 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 7 Feb 2023 15:50:43 -0800 Subject: [PATCH 13/25] add connection retry logic Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 102 ++++++++++++++++++++++++++------ pkg/sync/grpc/grpc_sync_test.go | 16 +++-- 2 files changed, 96 insertions(+), 22 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 4f928bdb3..428b2a4a6 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -4,7 +4,11 @@ import ( "context" "fmt" "io" + "math" "strings" + "time" + + "google.golang.org/grpc/credentials/insecure" "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" @@ -13,12 +17,20 @@ import ( "github.com/open-feature/flagd/pkg/sync" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) -// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate -// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. -const Prefix = "grpc://" +const ( + // Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate + // remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. + Prefix = "grpc://" + + // Connection retry constants + // Backoff period is calculated with backOffBase ^ #retry-iteration. However, when backoffLimit is reached, fallback + // to constantBackoffDelay + backoffLimit = 3 + backOffBase = 4 + constantBackoffDelay = 60 +) type Sync struct { Target string @@ -27,29 +39,86 @@ type Sync struct { } func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials())) + options := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + // initial dial and connection. Failure here must result in a startup failure + dial, err := grpc.DialContext(ctx, g.Target, options...) if err != nil { - g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error())) + g.Logger.Error(fmt.Sprintf("Error establishing grpc connection: %s", err.Error())) return err } - return g.streamListener(ctx, dial, dataSync) + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + if err != nil { + g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) + return err + } + + // initial stream listening + err = g.streamListener(ctx, syncClient, dataSync) + if err != nil { + g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) + } + + // retry connection establishment + for { + g.Logger.Warn(fmt.Sprintf("Connection re-establishment attempt in-progress for grpc target: %s", g.Target)) + + syncClient = g.connectWithRetry(ctx, options...) + err = g.streamListener(ctx, syncClient, dataSync) + if err != nil { + g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) + continue + } + } } -// streamListener performs the grpc listening on provided client connection and push updates through dataSync channel -func (g *Sync) streamListener(ctx context.Context, dial *grpc.ClientConn, dataSync chan<- sync.DataSync) error { - group, localContext := errgroup.WithContext(ctx) +// connectWithRetry is a helper to perform exponential backoff till provided configurations and then retry connection +// periodically till a successful connection is established +func (g *Sync) connectWithRetry( + ctx context.Context, options ...grpc.DialOption, +) syncv1grpc.FlagSyncService_SyncFlagsClient { + var iteration int - group.Go(func() error { - serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + for { + var sleep time.Duration + if iteration >= backoffLimit { + sleep = constantBackoffDelay + } else { + iteration++ + sleep = time.Duration(math.Pow(backOffBase, float64(iteration))) + } + + time.Sleep(sleep * time.Second) + dial, err := grpc.DialContext(ctx, g.Target, options...) + if err != nil { + g.Logger.Debug(fmt.Sprintf("Error dialing target: %s", err.Error())) + continue + } + + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { - g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) - return err + g.Logger.Debug(fmt.Sprintf("Error openning service client: %s", err.Error())) + continue } - return g.handleFlagSync(syncClient, dataSync) + g.Logger.Info(fmt.Sprintf("Connection re-established with grpc target: %s", g.Target)) + return syncClient + } +} + +// streamListener wraps the grpc listening on provided stream and push updates through dataSync channel +func (g *Sync) streamListener( + ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync, +) error { + group, localContext := errgroup.WithContext(ctx) + group.Go(func() error { + return g.handleFlagSync(stream, dataSync) }) <-localContext.Done() @@ -67,7 +136,6 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, for { data, err := stream.Recv() if err != nil { - g.Logger.Warn(fmt.Sprintf("Error with stream response: %s", err.Error())) return err } @@ -107,7 +175,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, case v1.SyncState_SYNC_STATE_PING: g.Logger.Debug("received server ping") default: - g.Logger.Warn(fmt.Sprintf("receivied unknown state: %s", data.State.String())) + g.Logger.Debug(fmt.Sprintf("receivied unknown state: %s", data.State.String())) } } } diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 479f16f8a..830164745 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -227,6 +227,12 @@ func Test_StreamListener(t *testing.T) { // start server go serve(&bufServer) + grpcSync := Sync{ + Target: target, + ProviderID: "", + Logger: logger.NewLogger(nil, false), + } + // initialize client dial, err := grpc.Dial(target, grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { @@ -237,17 +243,17 @@ func Test_StreamListener(t *testing.T) { t.Errorf("Error setting up client connection: %s", err.Error()) } - grpcSync := Sync{ - Target: target, - ProviderID: "", - Logger: logger.NewLogger(nil, false), + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: grpcSync.ProviderID}) + if err != nil { + t.Errorf("Error opening client stream: %s", err.Error()) } syncChan := make(chan sync.DataSync, 1) // listen to stream go func() { - err := grpcSync.streamListener(context.Background(), dial, syncChan) + err := grpcSync.streamListener(context.Background(), syncClient, syncChan) if err != nil { // must ignore EOF as this is returned for stream end if err != io.EOF { From 7d0259a3bfa24c1c034d3fd8cae4de44be8415e9 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Wed, 8 Feb 2023 09:50:49 -0800 Subject: [PATCH 14/25] tidy up deps Signed-off-by: Kavindu Dodanduwa --- go.sum | 7 ------- 1 file changed, 7 deletions(-) diff --git a/go.sum b/go.sum index 3452a0e8b..576f3912b 100644 --- a/go.sum +++ b/go.sum @@ -67,7 +67,6 @@ github.com/bufbuild/connect-go v1.5.1 h1:ORhrSiu63hWxtuMmC/V1mKySSRhEySsW5RkHJcy github.com/bufbuild/connect-go v1.5.1/go.mod h1:9iNvh/NOsfhNBUH5CtvXeVUskQO1xsrEviH7ZArwZ3I= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -500,8 +499,6 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M= golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -714,8 +711,6 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef h1:uQ2vjV/sHTsWSqdKeLqmwitzgvjMl7o4IdtHwUDXSJY= -google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -737,8 +732,6 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= -google.golang.org/grpc v1.52.3 h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ= -google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= From e59f69228c85310cf961cbd7271868a7ee95c4fb Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Feb 2023 08:46:53 -0800 Subject: [PATCH 15/25] Update pkg/sync/grpc/grpc_sync_test.go Co-authored-by: James Milligan <75740990+james-milligan@users.noreply.github.com> Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 830164745..56ea96b5a 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -116,7 +116,7 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { data := <-syncChan if data.Type != test.want { - t.Errorf("Reuturned data sync state = %v, wanted %v", data.Type, test.want) + t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want) } }) } From c3541d4cc6890fb9341a0af248c545754ec8c2b1 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Feb 2023 08:52:27 -0800 Subject: [PATCH 16/25] Update pkg/sync/grpc/grpc_sync.go Co-authored-by: James Milligan <75740990+james-milligan@users.noreply.github.com> Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 428b2a4a6..82219fcd9 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -175,7 +175,7 @@ func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, case v1.SyncState_SYNC_STATE_PING: g.Logger.Debug("received server ping") default: - g.Logger.Debug(fmt.Sprintf("receivied unknown state: %s", data.State.String())) + g.Logger.Debug(fmt.Sprintf("received unknown state: %s", data.State.String())) } } } From c129d79d1024a5beed12d6b772e5aa078b325b3b Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Feb 2023 09:50:09 -0800 Subject: [PATCH 17/25] review changes - improve context handling Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 71 +++++++++++++++------------------ pkg/sync/grpc/grpc_sync_test.go | 2 +- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 82219fcd9..1793c38c2 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -3,7 +3,6 @@ package grpc import ( "context" "fmt" - "io" "math" "strings" "time" @@ -15,7 +14,6 @@ import ( "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/sync" - "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -25,11 +23,11 @@ const ( Prefix = "grpc://" // Connection retry constants - // Backoff period is calculated with backOffBase ^ #retry-iteration. However, when backoffLimit is reached, fallback - // to constantBackoffDelay - backoffLimit = 3 + // Back off period is calculated with backOffBase ^ #retry-iteration. However, when #retry-iteration count reach + // backOffLimit, retry delay fallback to constantBackOffDelay + backOffLimit = 3 backOffBase = 4 - constantBackoffDelay = 60 + constantBackOffDelay = 60 ) type Sync struct { @@ -51,24 +49,27 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) - syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) return err } // initial stream listening - err = g.streamListener(ctx, syncClient, dataSync) + err = g.handleFlagSync(syncClient, dataSync) if err != nil { g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) } // retry connection establishment for { - g.Logger.Warn(fmt.Sprintf("Connection re-establishment attempt in-progress for grpc target: %s", g.Target)) + syncClient, ok := g.connectWithRetry(ctx, options...) + if !ok { + // We shall exit + return nil + } - syncClient = g.connectWithRetry(ctx, options...) - err = g.streamListener(ctx, syncClient, dataSync) + err = g.handleFlagSync(syncClient, dataSync) if err != nil { g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) continue @@ -76,23 +77,34 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } -// connectWithRetry is a helper to perform exponential backoff till provided configurations and then retry connection -// periodically till a successful connection is established +// connectWithRetry is a helper to perform exponential back off till provided configurations and then retry connection +// periodically till a successful connection is established. Caller must not expect an error. Hence, errors are handled, +// logged internally. However, if the provided context is done, method exit with a non-ok state which must be verified +// by the caller func (g *Sync) connectWithRetry( ctx context.Context, options ...grpc.DialOption, -) syncv1grpc.FlagSyncService_SyncFlagsClient { +) (syncv1grpc.FlagSyncService_SyncFlagsClient, bool) { var iteration int for { var sleep time.Duration - if iteration >= backoffLimit { - sleep = constantBackoffDelay + if iteration >= backOffLimit { + sleep = constantBackOffDelay } else { iteration++ sleep = time.Duration(math.Pow(backOffBase, float64(iteration))) } - time.Sleep(sleep * time.Second) + // Block the next connection attempt and check the context + select { + case <-time.After(sleep * time.Second): + break + case <-ctx.Done(): + // context done means we shall exit + return nil, false + } + + g.Logger.Warn(fmt.Sprintf("Connection re-establishment attempt in-progress for grpc target: %s", g.Target)) dial, err := grpc.DialContext(ctx, g.Target, options...) if err != nil { @@ -101,37 +113,18 @@ func (g *Sync) connectWithRetry( } serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) - syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { g.Logger.Debug(fmt.Sprintf("Error openning service client: %s", err.Error())) continue } g.Logger.Info(fmt.Sprintf("Connection re-established with grpc target: %s", g.Target)) - return syncClient - } -} - -// streamListener wraps the grpc listening on provided stream and push updates through dataSync channel -func (g *Sync) streamListener( - ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync, -) error { - group, localContext := errgroup.WithContext(ctx) - group.Go(func() error { - return g.handleFlagSync(stream, dataSync) - }) - - <-localContext.Done() - - err := group.Wait() - if err == io.EOF { - g.Logger.Info("Stream closed by the server") - return err + return syncClient, true } - - return err } +// handleFlagSync wraps the stream listening and push updates through dataSync channel func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { for { data, err := stream.Recv() diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 56ea96b5a..081a14a68 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -253,7 +253,7 @@ func Test_StreamListener(t *testing.T) { // listen to stream go func() { - err := grpcSync.streamListener(context.Background(), syncClient, syncChan) + err := grpcSync.handleFlagSync(syncClient, syncChan) if err != nil { // must ignore EOF as this is returned for stream end if err != io.EOF { From adb489ff290532a18768e0be26f6e81d40f236d2 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Feb 2023 09:53:00 -0800 Subject: [PATCH 18/25] Update pkg/sync/grpc/grpc_sync_test.go Co-authored-by: James Milligan <75740990+james-milligan@users.noreply.github.com> Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 081a14a68..ae8e51e33 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -266,7 +266,7 @@ func Test_StreamListener(t *testing.T) { out := <-syncChan if expected.Type != out.Type { - t.Errorf("Reuturned sync type = %v, wanted %v", out.Type, expected.Type) + t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type) } if expected.FlagData != out.FlagData { From d878c6a2aff642e6d96c7065f82f9ab6273721b9 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Feb 2023 09:53:23 -0800 Subject: [PATCH 19/25] Update pkg/sync/grpc/grpc_sync_test.go Co-authored-by: James Milligan <75740990+james-milligan@users.noreply.github.com> Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index ae8e51e33..d521372c8 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -270,7 +270,7 @@ func Test_StreamListener(t *testing.T) { } if expected.FlagData != out.FlagData { - t.Errorf("Reuturned sync data = %v, wanted %v", out.FlagData, expected.FlagData) + t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData) } } From 1113554df074bc65aa010615367288176e652409 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 9 Feb 2023 09:57:06 -0800 Subject: [PATCH 20/25] resolve go sum conflict Signed-off-by: Kavindu Dodanduwa --- go.sum | 36 ++++++------------------------------ 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/go.sum b/go.sum index 576f3912b..e257390e0 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,11 @@ buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.2.0-20220906183531-bc28b723cd77.4/go.mod h1:hAKk3I2AivrJgMLXjDGrfzRx2NVWQgEPNfr4Co9DLX4= buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM= -buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230123231905-11466466f72d.4 h1:1fQv2ozb/dr74S3CPIGZunSndVzHAPeY5niWnEr0xDg= -buf.build/gen/go/kavindudodan/flagd/grpc/go v1.2.0-20230123231905-11466466f72d.4/go.mod h1:HhDGIP35zh2M82Dx4oIzit5ZEtoOaOyy35dAr/e+Uuo= -buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230123231905-11466466f72d.4 h1:uZNmxfZMZVrfd31Iz4YQsRa6zl4kifKUwRWjtIMz8yI= -buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go v1.28.1-20230123231905-11466466f72d.4/go.mod h1:jpjQMBqnFcgHMK99ylFu+PhUhA1KD6AC1QSz27yC/1U= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 h1:KoSPqmHyi3x27tPFLQ994CJjG4qc59v+0gbxY9+VXso= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1/go.mod h1:68WGv4z/jXuTS3G7FEFQTEw4wiMmulBSX6BlSFX2Xc8= -buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 h1:9ioWUVmnURL+TX4qVjyzzE3o9Z2ACDGidqtu9dkjmdY= -buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4/go.mod h1:mgJ/h6pO54DWEYi3YqzlLxFBerE6L4dPRPMIhKEGBw0= -buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 h1:HMDOJt1SDrELWslNJWfjg4ItbNOPx3ZvD8RtikKr5SE= -buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4/go.mod h1:+Bnrjo56uVn/aBcLWchTveR8UeCj+KSJN4fE0xSmBNc= +buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20230207182158-c211472558c3.4 h1:11ayeHd1H1LhRuJlHzIfbcUk64gAtnm5zrBZXxoOyN0= +buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20230207182158-c211472558c3.4/go.mod h1:8ce/bdmiPVo2i5s+bYLENyIvi24dmBN5zy+nPyCUAHg= +buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20230207182158-c211472558c3.4 h1:6Ht0iYYWoG7qDuxGs/aG11uj9ulFfTL2zeWCryj9aWg= +buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20230207182158-c211472558c3.4/go.mod h1:+Bnrjo56uVn/aBcLWchTveR8UeCj+KSJN4fE0xSmBNc= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -61,8 +57,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bufbuild/connect-go v1.5.0 h1:IfbgbzzaaZvF+OM3SfxO2EjtvNJarNAz2DIRuuNjAgc= -github.com/bufbuild/connect-go v1.5.0/go.mod h1:9iNvh/NOsfhNBUH5CtvXeVUskQO1xsrEviH7ZArwZ3I= github.com/bufbuild/connect-go v1.5.1 h1:ORhrSiu63hWxtuMmC/V1mKySSRhEySsW5RkHJcyJXBk= github.com/bufbuild/connect-go v1.5.1/go.mod h1:9iNvh/NOsfhNBUH5CtvXeVUskQO1xsrEviH7ZArwZ3I= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -286,14 +280,6 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.6.0 h1:9t9b9vRUbFq3C4qKFCGkVuq/fIHji802N1nrtkh1mNc= github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= -github.com/open-feature/open-feature-operator v0.2.24 h1:6UwfHO7pa2WDDpdyL+hzYwukbooAA2IZFgyj5xga2vw= -github.com/open-feature/open-feature-operator v0.2.24/go.mod h1:6zsu3m2sa8b4qJlHIAp1Kuc80mCAOAkBCkvDTTyv9ZY= -github.com/open-feature/open-feature-operator v0.2.25 h1:6X1dn7YTTCxRj7Sq6NR3ThDvXYt+4VPPC1GP7D5GD+Q= -github.com/open-feature/open-feature-operator v0.2.25/go.mod h1:8OFtVXXdVpZTSx1vHravbTYup4iyeb+PLmiKbRL11TA= -github.com/open-feature/open-feature-operator v0.2.26 h1:nv3Bln6Zvkc0fXz1/XpQR5TtiXn8KZ/9r85y/jWGNE0= -github.com/open-feature/open-feature-operator v0.2.26/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= -github.com/open-feature/open-feature-operator v0.2.27 h1:OIPEVrEOK39mLeImKrcLnd1AVClj7VrEMOtnZjHLXxY= -github.com/open-feature/open-feature-operator v0.2.27/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= github.com/open-feature/open-feature-operator v0.2.28 h1:qzzVq8v9G7aXO7luocO/wQCGnTJjtcQh75mDOqjnFxo= github.com/open-feature/open-feature-operator v0.2.28/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= github.com/open-feature/schemas v0.2.8 h1:oA75hJXpOd9SFgmNI2IAxWZkwzQPUDm7Jyyh3q489wM= @@ -484,8 +470,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -565,12 +549,10 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= -golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -581,8 +563,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -799,10 +779,6 @@ k8s.io/utils v0.0.0-20221128185143-99ec85e7a448/go.mod h1:OLgZIPagt7ERELqWJFomSt rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/controller-runtime v0.14.2 h1:P6IwDhbsRWsBClt/8/h8Zy36bCuGuW5Op7MHpFrN/60= -sigs.k8s.io/controller-runtime v0.14.2/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= -sigs.k8s.io/controller-runtime v0.14.3 h1:F1JutCoGfSDRiayjAaWcB8SC4BwIt6qkZ/TwiVY8ZRI= -sigs.k8s.io/controller-runtime v0.14.3/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= sigs.k8s.io/controller-runtime v0.14.4 h1:Kd/Qgx5pd2XUL08eOV2vwIq3L9GhIbJ5Nxengbd4/0M= sigs.k8s.io/controller-runtime v0.14.4/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= From 02e84338d3d5d3b1db5c16ede04e1e3f8c5a04b9 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Mon, 13 Feb 2023 14:03:58 -0800 Subject: [PATCH 21/25] Document grpc sync uri Signed-off-by: Kavindu Dodanduwa --- docs/configuration/configuration.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index e45dc5fcf..2a209bdef 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -12,13 +12,14 @@ Config file expects the keys to have the exact naming as the flags. ### URI patterns -Any URI passed to flagd via the `--uri` flag must follow one of the 3 following patterns to ensure that it is passed to the correct implementation: - -| Sync | Pattern | Example | -| ----------- | ----------- | ----------- | -| Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` | -| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` | -| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` | +Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation: + +| Sync | Pattern | Example | +|------------|------------------------------------|---------------------------------------| +| Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` | +| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` | +| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` | +| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` | From 0a5d91aa2f7ec5164e4eaf57599d5324ae14c2c4 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 14 Feb 2023 09:06:50 -0800 Subject: [PATCH 22/25] review changes Signed-off-by: Kavindu Dodanduwa --- cmd/start.go | 22 +++++++++------------- docs/configuration/flagd_start.md | 1 - pkg/runtime/from_config.go | 8 +++----- pkg/runtime/runtime.go | 9 ++++----- pkg/sync/grpc/grpc_sync.go | 25 +++++++++++-------------- 5 files changed, 27 insertions(+), 38 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 4da87b855..d614a70e8 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -20,7 +20,6 @@ const ( metricsPortFlagName = "metrics-port" portFlagName = "port" providerArgsFlagName = "sync-provider-args" - providerIdentifier = "provider-id" serverCertPathFlagName = "server-cert-path" serverKeyPathFlagName = "server-key-path" socketPathFlagName = "socket-path" @@ -58,7 +57,6 @@ func init() { syncProviderFlagName, "y", "", "DEPRECATED: Set a sync provider e.g. filepath or remote", ) flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ") - flags.StringP(providerIdentifier, "i", "", "Set the identifier of this flagd runtime") _ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName)) _ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName)) @@ -67,7 +65,6 @@ func init() { _ = viper.BindPFlag(metricsPortFlagName, flags.Lookup(metricsPortFlagName)) _ = viper.BindPFlag(portFlagName, flags.Lookup(portFlagName)) _ = viper.BindPFlag(providerArgsFlagName, flags.Lookup(providerArgsFlagName)) - _ = viper.BindPFlag(providerIdentifier, flags.Lookup(providerIdentifier)) _ = viper.BindPFlag(serverCertPathFlagName, flags.Lookup(serverCertPathFlagName)) _ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName)) _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) @@ -107,16 +104,15 @@ var startCmd = &cobra.Command{ } // Build Runtime ----------------------------------------------------------- rt, err := runtime.FromConfig(logger, runtime.Config{ - CORS: viper.GetStringSlice(corsFlagName), - MetricsPort: viper.GetInt32(metricsPortFlagName), - ProviderArgs: viper.GetStringMapString(providerArgsFlagName), - ProviderIdentifier: viper.GetString(providerIdentifier), - ServiceCertPath: viper.GetString(serverCertPathFlagName), - ServiceKeyPath: viper.GetString(serverKeyPathFlagName), - ServicePort: viper.GetInt32(portFlagName), - ServiceSocketPath: viper.GetString(socketPathFlagName), - SyncBearerToken: viper.GetString(bearerTokenFlagName), - SyncURI: viper.GetStringSlice(uriFlagName), + CORS: viper.GetStringSlice(corsFlagName), + MetricsPort: viper.GetInt32(metricsPortFlagName), + ProviderArgs: viper.GetStringMapString(providerArgsFlagName), + ServiceCertPath: viper.GetString(serverCertPathFlagName), + ServiceKeyPath: viper.GetString(serverKeyPathFlagName), + ServicePort: viper.GetInt32(portFlagName), + ServiceSocketPath: viper.GetString(socketPathFlagName), + SyncBearerToken: viper.GetString(bearerTokenFlagName), + SyncURI: viper.GetStringSlice(uriFlagName), }) if err != nil { rtLogger.Fatal(err.Error()) diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index d21783142..0cc8945f3 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -16,7 +16,6 @@ flagd start [flags] -z, --log-format string Set the logging format, e.g. console or json (default "console") -m, --metrics-port int32 Port to serve metrics on (default 8014) -p, --port int32 Port to listen on (default 8013) - -i, --provider-id string Set the identifier of this flagd runtime -c, --server-cert-path string Server side tls certificate path -k, --server-key-path string Server side tls key path -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index c2f7741c9..4ff8c4e12 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -102,18 +102,16 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { }) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) case regGRPC.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ - Target: grpc.URLToGRPCTarget(uri), - ProviderID: r.config.ProviderIdentifier, + Target: grpc.URLToGRPCTarget(uri), Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "grpc"), ), }) default: - return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', or 'core.openfeature.dev'", - uri) + return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ + " or 'core.openfeature.dev'", uri) } } return nil diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 7b558ba08..0b9ffefbb 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -33,11 +33,10 @@ type Config struct { ServiceCertPath string ServiceKeyPath string - ProviderArgs sync.ProviderArgs - ProviderIdentifier string - SyncURI []string - RemoteSyncType string - SyncBearerToken string + ProviderArgs sync.ProviderArgs + SyncURI []string + RemoteSyncType string + SyncBearerToken string CORS []string } diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 1793c38c2..f72bbfecc 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -44,22 +44,20 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // initial dial and connection. Failure here must result in a startup failure dial, err := grpc.DialContext(ctx, g.Target, options...) if err != nil { - g.Logger.Error(fmt.Sprintf("Error establishing grpc connection: %s", err.Error())) + g.Logger.Error(fmt.Sprintf("error establishing grpc connection: %s", err.Error())) return err } serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { - g.Logger.Error(fmt.Sprintf("Error calling streaming operation: %s", err.Error())) + g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error())) return err } // initial stream listening err = g.handleFlagSync(syncClient, dataSync) - if err != nil { - g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) - } + g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) // retry connection establishment for { @@ -71,16 +69,15 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { err = g.handleFlagSync(syncClient, dataSync) if err != nil { - g.Logger.Warn(fmt.Sprintf("Error with stream listener: %s", err.Error())) + g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) continue } } } -// connectWithRetry is a helper to perform exponential back off till provided configurations and then retry connection -// periodically till a successful connection is established. Caller must not expect an error. Hence, errors are handled, -// logged internally. However, if the provided context is done, method exit with a non-ok state which must be verified -// by the caller +// connectWithRetry is a helper to perform exponential back off and then retry connection periodically till a successful +// connection is established. Caller must not expect an error. Hence, errors are handled, logged internally. However, +// if the provided context is done, method exit with a non-ok state which must be verified by the caller func (g *Sync) connectWithRetry( ctx context.Context, options ...grpc.DialOption, ) (syncv1grpc.FlagSyncService_SyncFlagsClient, bool) { @@ -104,22 +101,22 @@ func (g *Sync) connectWithRetry( return nil, false } - g.Logger.Warn(fmt.Sprintf("Connection re-establishment attempt in-progress for grpc target: %s", g.Target)) + g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.Target)) dial, err := grpc.DialContext(ctx, g.Target, options...) if err != nil { - g.Logger.Debug(fmt.Sprintf("Error dialing target: %s", err.Error())) + g.Logger.Debug(fmt.Sprintf("error dialing target: %s", err.Error())) continue } serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { - g.Logger.Debug(fmt.Sprintf("Error openning service client: %s", err.Error())) + g.Logger.Debug(fmt.Sprintf("error openning service client: %s", err.Error())) continue } - g.Logger.Info(fmt.Sprintf("Connection re-established with grpc target: %s", g.Target)) + g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.Target)) return syncClient, true } } From 7678e4bef85e8456aadf33d19e222c15cc409056 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Wed, 15 Feb 2023 07:19:43 -0800 Subject: [PATCH 23/25] Update pkg/sync/grpc/grpc_sync.go Co-authored-by: Skye Gill Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index f72bbfecc..a4de02afc 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -75,7 +75,7 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } -// connectWithRetry is a helper to perform exponential back off and then retry connection periodically till a successful +// connectWithRetry is a helper that performs exponential back off after retrying connection attempts periodically until a successful // connection is established. Caller must not expect an error. Hence, errors are handled, logged internally. However, // if the provided context is done, method exit with a non-ok state which must be verified by the caller func (g *Sync) connectWithRetry( From 5f0ecc1fd9dcef8ef1087fff3d6b71f6a633a646 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Wed, 15 Feb 2023 07:19:54 -0800 Subject: [PATCH 24/25] Update pkg/sync/grpc/grpc_sync.go Co-authored-by: Skye Gill Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index a4de02afc..71ef871ec 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -112,7 +112,7 @@ func (g *Sync) connectWithRetry( serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) if err != nil { - g.Logger.Debug(fmt.Sprintf("error openning service client: %s", err.Error())) + g.Logger.Debug(fmt.Sprintf("error opening service client: %s", err.Error())) continue } From 76edf55ccbb8ff2930d03afb5400239f3d05c85e Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Wed, 15 Feb 2023 07:36:34 -0800 Subject: [PATCH 25/25] lint fixes Signed-off-by: Kavindu Dodanduwa --- pkg/sync/grpc/grpc_sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 71ef871ec..20d96f485 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -75,9 +75,10 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } -// connectWithRetry is a helper that performs exponential back off after retrying connection attempts periodically until a successful -// connection is established. Caller must not expect an error. Hence, errors are handled, logged internally. However, -// if the provided context is done, method exit with a non-ok state which must be verified by the caller +// connectWithRetry is a helper that performs exponential back off after retrying connection attempts periodically until +// a successful connection is established. Caller must not expect an error. Hence, errors are handled, logged +// internally. However, if the provided context is done, method exit with a non-ok state which must be verified by the +// caller func (g *Sync) connectWithRetry( ctx context.Context, options ...grpc.DialOption, ) (syncv1grpc.FlagSyncService_SyncFlagsClient, bool) {