diff --git a/cmd/start.go b/cmd/start.go index 94b830196..d614a70e8 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -13,18 +13,18 @@ import ( ) const ( - portFlagName = "port" + bearerTokenFlagName = "bearer-token" + corsFlagName = "cors-origin" + evaluatorFlagName = "evaluator" + logFormatFlagName = "log-format" metricsPortFlagName = "metrics-port" - socketPathFlagName = "socket-path" + portFlagName = "port" providerArgsFlagName = "sync-provider-args" - evaluatorFlagName = "evaluator" serverCertPathFlagName = "server-cert-path" serverKeyPathFlagName = "server-key-path" - uriFlagName = "uri" - bearerTokenFlagName = "bearer-token" - corsFlagName = "cors-origin" + socketPathFlagName = "socket-path" syncProviderFlagName = "sync-provider" - logFormatFlagName = "log-format" + uriFlagName = "uri" ) func init() { @@ -46,7 +46,7 @@ func init() { "a", nil, "Sync provider arguments as key values separated by =") flags.StringSliceP( uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+ - "url or FeatureFlagConfiguration. Using multiple providers is supported however if"+ + "url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if"+ " flag keys are duplicated across multiple sources it may lead to unexpected behavior. "+ "Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.", ) @@ -58,18 +58,18 @@ func init() { ) flags.StringP(logFormatFlagName, "z", "console", "Set the logging format, e.g. console or json ") - _ = viper.BindPFlag(portFlagName, flags.Lookup(portFlagName)) + _ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName)) + _ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName)) + _ = viper.BindPFlag(evaluatorFlagName, flags.Lookup(evaluatorFlagName)) + _ = viper.BindPFlag(logFormatFlagName, flags.Lookup(logFormatFlagName)) _ = viper.BindPFlag(metricsPortFlagName, flags.Lookup(metricsPortFlagName)) - _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) + _ = viper.BindPFlag(portFlagName, flags.Lookup(portFlagName)) _ = viper.BindPFlag(providerArgsFlagName, flags.Lookup(providerArgsFlagName)) - _ = viper.BindPFlag(evaluatorFlagName, flags.Lookup(evaluatorFlagName)) _ = viper.BindPFlag(serverCertPathFlagName, flags.Lookup(serverCertPathFlagName)) _ = viper.BindPFlag(serverKeyPathFlagName, flags.Lookup(serverKeyPathFlagName)) - _ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName)) - _ = viper.BindPFlag(bearerTokenFlagName, flags.Lookup(bearerTokenFlagName)) - _ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName)) + _ = viper.BindPFlag(socketPathFlagName, flags.Lookup(socketPathFlagName)) _ = viper.BindPFlag(syncProviderFlagName, flags.Lookup(syncProviderFlagName)) - _ = viper.BindPFlag(logFormatFlagName, flags.Lookup(logFormatFlagName)) + _ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName)) } // startCmd represents the start command @@ -104,15 +104,15 @@ var startCmd = &cobra.Command{ } // Build Runtime ----------------------------------------------------------- rt, err := runtime.FromConfig(logger, runtime.Config{ - ServicePort: viper.GetInt32(portFlagName), + CORS: viper.GetStringSlice(corsFlagName), MetricsPort: viper.GetInt32(metricsPortFlagName), - ServiceSocketPath: viper.GetString(socketPathFlagName), + ProviderArgs: viper.GetStringMapString(providerArgsFlagName), ServiceCertPath: viper.GetString(serverCertPathFlagName), ServiceKeyPath: viper.GetString(serverKeyPathFlagName), - ProviderArgs: viper.GetStringMapString(providerArgsFlagName), - SyncURI: viper.GetStringSlice(uriFlagName), + ServicePort: viper.GetInt32(portFlagName), + ServiceSocketPath: viper.GetString(socketPathFlagName), SyncBearerToken: viper.GetString(bearerTokenFlagName), - CORS: viper.GetStringSlice(corsFlagName), + SyncURI: viper.GetStringSlice(uriFlagName), }) if err != nil { rtLogger.Fatal(err.Error()) diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index e45dc5fcf..2a209bdef 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -12,13 +12,14 @@ Config file expects the keys to have the exact naming as the flags. ### URI patterns -Any URI passed to flagd via the `--uri` flag must follow one of the 3 following patterns to ensure that it is passed to the correct implementation: - -| Sync | Pattern | Example | -| ----------- | ----------- | ----------- | -| Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` | -| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` | -| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` | +Any URI passed to flagd via the `--uri` flag must follow one of the 4 following patterns to ensure that it is passed to the correct implementation: + +| Sync | Pattern | Example | +|------------|------------------------------------|---------------------------------------| +| Kubernetes | `core.openfeature.dev/namespace/name` | `core.openfeature.dev/default/my-crd` | +| Filepath | `file:path/to/my/flag` | `file:etc/flagd/my-flags.json` | +| Remote | `http(s)://flag-source-url` | `https://my-flags.com/flags` | +| Grpc | `grpc://flag-source-url` | `grpc://my-flags-server` | diff --git a/docs/configuration/flagd_start.md b/docs/configuration/flagd_start.md index 1c14f76d2..0cc8945f3 100644 --- a/docs/configuration/flagd_start.md +++ b/docs/configuration/flagd_start.md @@ -21,7 +21,7 @@ flagd start [flags] -d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote -a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default []) - -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. + -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. ``` ### Options inherited from parent commands diff --git a/go.mod b/go.mod index 4aa56a8d3..cf1faf739 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.19 require ( buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 - buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 - buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 + buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20230207182158-c211472558c3.4 + buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20230207182158-c211472558c3.4 github.com/bufbuild/connect-go v1.5.1 github.com/diegoholiveira/jsonlogic/v3 v3.2.7 github.com/dimiro1/banner v1.1.0 diff --git a/go.sum b/go.sum index 4041595ab..2b6b9e4d7 100644 --- a/go.sum +++ b/go.sum @@ -2,10 +2,10 @@ buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.2.0-20220906183531-bc28b buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1 h1:KoSPqmHyi3x27tPFLQ994CJjG4qc59v+0gbxY9+VXso= buf.build/gen/go/open-feature/flagd/bufbuild/connect-go v1.4.1-20221226184428-0dc62ff103b8.1/go.mod h1:68WGv4z/jXuTS3G7FEFQTEw4wiMmulBSX6BlSFX2Xc8= -buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4 h1:9ioWUVmnURL+TX4qVjyzzE3o9Z2ACDGidqtu9dkjmdY= -buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20221226184428-0dc62ff103b8.4/go.mod h1:mgJ/h6pO54DWEYi3YqzlLxFBerE6L4dPRPMIhKEGBw0= -buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4 h1:HMDOJt1SDrELWslNJWfjg4ItbNOPx3ZvD8RtikKr5SE= -buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20221226184428-0dc62ff103b8.4/go.mod h1:+Bnrjo56uVn/aBcLWchTveR8UeCj+KSJN4fE0xSmBNc= +buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20230207182158-c211472558c3.4 h1:11ayeHd1H1LhRuJlHzIfbcUk64gAtnm5zrBZXxoOyN0= +buf.build/gen/go/open-feature/flagd/grpc/go v1.2.0-20230207182158-c211472558c3.4/go.mod h1:8ce/bdmiPVo2i5s+bYLENyIvi24dmBN5zy+nPyCUAHg= +buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20230207182158-c211472558c3.4 h1:6Ht0iYYWoG7qDuxGs/aG11uj9ulFfTL2zeWCryj9aWg= +buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20230207182158-c211472558c3.4/go.mod h1:+Bnrjo56uVn/aBcLWchTveR8UeCj+KSJN4fE0xSmBNc= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -61,7 +61,6 @@ github.com/bufbuild/connect-go v1.5.1 h1:ORhrSiu63hWxtuMmC/V1mKySSRhEySsW5RkHJcy github.com/bufbuild/connect-go v1.5.1/go.mod h1:9iNvh/NOsfhNBUH5CtvXeVUskQO1xsrEviH7ZArwZ3I= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -471,8 +470,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= @@ -488,8 +485,6 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M= golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -556,14 +551,10 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= -golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -574,8 +565,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -704,8 +693,6 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef h1:uQ2vjV/sHTsWSqdKeLqmwitzgvjMl7o4IdtHwUDXSJY= -google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -727,8 +714,6 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= -google.golang.org/grpc v1.52.3 h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ= -google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 8684fe6b9..4ff8c4e12 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -6,13 +6,13 @@ import ( "regexp" "time" - "github.com/open-feature/flagd/pkg/sync/file" - httpSync "github.com/open-feature/flagd/pkg/sync/http" - "github.com/open-feature/flagd/pkg/eval" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/service" "github.com/open-feature/flagd/pkg/sync" + "github.com/open-feature/flagd/pkg/sync/file" + "github.com/open-feature/flagd/pkg/sync/grpc" + httpSync "github.com/open-feature/flagd/pkg/sync/http" "github.com/open-feature/flagd/pkg/sync/kubernetes" "github.com/robfig/cron" "go.uber.org/zap" @@ -21,12 +21,14 @@ import ( var ( regCrd *regexp.Regexp regURL *regexp.Regexp + regGRPC *regexp.Regexp regFile *regexp.Regexp ) func init() { regCrd = regexp.MustCompile("^core.openfeature.dev/") regURL = regexp.MustCompile("^https?://") + regGRPC = regexp.MustCompile("^" + grpc.Prefix) regFile = regexp.MustCompile("^file:") } @@ -99,9 +101,17 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { Cron: cron.New(), }) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) + case regGRPC.Match(uriB): + r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ + Target: grpc.URLToGRPCTarget(uri), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + }) default: - return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', or 'core.openfeature.dev'", - uri) + return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ + " or 'core.openfeature.dev'", uri) } } return nil diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 3c441a0ee..0b9ffefbb 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -35,6 +35,7 @@ type Config struct { ProviderArgs sync.ProviderArgs SyncURI []string + RemoteSyncType string SyncBearerToken string CORS []string diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go new file mode 100644 index 000000000..20d96f485 --- /dev/null +++ b/pkg/sync/grpc/grpc_sync.go @@ -0,0 +1,184 @@ +package grpc + +import ( + "context" + "fmt" + "math" + "strings" + "time" + + "google.golang.org/grpc/credentials/insecure" + + "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" + + "github.com/open-feature/flagd/pkg/logger" + "github.com/open-feature/flagd/pkg/sync" + "google.golang.org/grpc" +) + +const ( + // Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate + // remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints. + Prefix = "grpc://" + + // Connection retry constants + // Back off period is calculated with backOffBase ^ #retry-iteration. However, when #retry-iteration count reach + // backOffLimit, retry delay fallback to constantBackOffDelay + backOffLimit = 3 + backOffBase = 4 + constantBackOffDelay = 60 +) + +type Sync struct { + Target string + ProviderID string + Logger *logger.Logger +} + +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + options := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + // initial dial and connection. Failure here must result in a startup failure + dial, err := grpc.DialContext(ctx, g.Target, options...) + if err != nil { + g.Logger.Error(fmt.Sprintf("error establishing grpc connection: %s", err.Error())) + return err + } + + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + if err != nil { + g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error())) + return err + } + + // initial stream listening + err = g.handleFlagSync(syncClient, dataSync) + g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) + + // retry connection establishment + for { + syncClient, ok := g.connectWithRetry(ctx, options...) + if !ok { + // We shall exit + return nil + } + + err = g.handleFlagSync(syncClient, dataSync) + if err != nil { + g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) + continue + } + } +} + +// connectWithRetry is a helper that performs exponential back off after retrying connection attempts periodically until +// a successful connection is established. Caller must not expect an error. Hence, errors are handled, logged +// internally. However, if the provided context is done, method exit with a non-ok state which must be verified by the +// caller +func (g *Sync) connectWithRetry( + ctx context.Context, options ...grpc.DialOption, +) (syncv1grpc.FlagSyncService_SyncFlagsClient, bool) { + var iteration int + + for { + var sleep time.Duration + if iteration >= backOffLimit { + sleep = constantBackOffDelay + } else { + iteration++ + sleep = time.Duration(math.Pow(backOffBase, float64(iteration))) + } + + // Block the next connection attempt and check the context + select { + case <-time.After(sleep * time.Second): + break + case <-ctx.Done(): + // context done means we shall exit + return nil, false + } + + g.Logger.Warn(fmt.Sprintf("connection re-establishment attempt in-progress for grpc target: %s", g.Target)) + + dial, err := grpc.DialContext(ctx, g.Target, options...) + if err != nil { + g.Logger.Debug(fmt.Sprintf("error dialing target: %s", err.Error())) + continue + } + + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + syncClient, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{ProviderId: g.ProviderID}) + if err != nil { + g.Logger.Debug(fmt.Sprintf("error opening service client: %s", err.Error())) + continue + } + + g.Logger.Info(fmt.Sprintf("connection re-established with grpc target: %s", g.Target)) + return syncClient, true + } +} + +// handleFlagSync wraps the stream listening and push updates through dataSync channel +func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { + for { + data, err := stream.Recv() + if err != nil { + return err + } + + switch data.State { + case v1.SyncState_SYNC_STATE_ALL: + dataSync <- sync.DataSync{ + FlagData: data.FlagConfiguration, + Source: g.Target, + Type: sync.ALL, + } + + g.Logger.Debug("received full configuration payload") + case v1.SyncState_SYNC_STATE_ADD: + dataSync <- sync.DataSync{ + FlagData: data.FlagConfiguration, + Source: g.Target, + Type: sync.ADD, + } + + g.Logger.Debug("received an add payload") + case v1.SyncState_SYNC_STATE_UPDATE: + dataSync <- sync.DataSync{ + FlagData: data.FlagConfiguration, + Source: g.Target, + Type: sync.UPDATE, + } + + g.Logger.Debug("received an update payload") + case v1.SyncState_SYNC_STATE_DELETE: + dataSync <- sync.DataSync{ + FlagData: data.FlagConfiguration, + Source: g.Target, + Type: sync.DELETE, + } + + g.Logger.Debug("received a delete payload") + case v1.SyncState_SYNC_STATE_PING: + g.Logger.Debug("received server ping") + default: + g.Logger.Debug(fmt.Sprintf("received unknown state: %s", data.State.String())) + } + } +} + +// URLToGRPCTarget is a helper to derive GRPC target from a provided URL +// For example, function returns the target localhost:9090 for the input grpc://localhost:9090 +func URLToGRPCTarget(url string) string { + index := strings.Split(url, Prefix) + + if len(index) == 2 { + return index[1] + } + + return index[0] +} diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go new file mode 100644 index 000000000..d521372c8 --- /dev/null +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -0,0 +1,330 @@ +package grpc + +import ( + "context" + "fmt" + "io" + "log" + "net" + "testing" + + "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" + + "github.com/open-feature/flagd/pkg/logger" + "github.com/open-feature/flagd/pkg/sync" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +func TestUrlToGRPCTarget(t *testing.T) { + tests := []struct { + name string + url string + want string + }{ + { + name: "With Prefix", + url: "grpc://test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Without Prefix", + url: "test.com/endpoint", + want: "test.com/endpoint", + }, + { + name: "Empty is empty", + url: "", + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := URLToGRPCTarget(tt.url); got != tt.want { + t.Errorf("URLToGRPCTarget() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSync_BasicFlagSyncStates(t *testing.T) { + grpcSyncImpl := Sync{ + Target: "grpc://test", + ProviderID: "", + Logger: logger.NewLogger(nil, false), + } + + tests := []struct { + name string + stream syncv1grpc.FlagSyncService_SyncFlagsClient + want sync.Type + }{ + { + name: "State All maps to Sync All", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_ALL, + }, + }, + want: sync.ALL, + }, + { + name: "State Add maps to Sync Add", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_ADD, + }, + }, + want: sync.ADD, + }, + { + name: "State Update maps to Sync Update", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_UPDATE, + }, + }, + want: sync.UPDATE, + }, + { + name: "State Delete maps to Sync Delete", + stream: &SimpleRecvMock{ + mockResponse: v1.SyncFlagsResponse{ + FlagConfiguration: "{}", + State: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + want: sync.DELETE, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + syncChan := make(chan sync.DataSync) + + go func() { + err := grpcSyncImpl.handleFlagSync(test.stream, syncChan) + if err != nil { + t.Errorf("Error handling flag sync: %s", err.Error()) + } + }() + data := <-syncChan + + if data.Type != test.want { + t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want) + } + }) + } +} + +func Test_StreamListener(t *testing.T) { + const target = "localBufCon" + + tests := []struct { + name string + input []serverPayload + output []sync.DataSync + }{ + { + name: "Single send", + input: []serverPayload{ + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.ALL, + }, + }, + }, + { + name: "Multiple send", + input: []serverPayload{ + { + flags: "{}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{}", + Type: sync.ALL, + }, + { + FlagData: "{\"flags\": {}}", + Type: sync.DELETE, + }, + }, + }, + { + name: "Pings are ignored & not written to channel", + input: []serverPayload{ + { + flags: "", + state: v1.SyncState_SYNC_STATE_PING, + }, + { + flags: "", + state: v1.SyncState_SYNC_STATE_PING, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_DELETE, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.DELETE, + }, + }, + }, + { + name: "Unknown states are & not written to channel", + input: []serverPayload{ + { + flags: "", + state: 42, + }, + { + flags: "", + state: -1, + }, + { + flags: "{\"flags\": {}}", + state: v1.SyncState_SYNC_STATE_ALL, + }, + }, + output: []sync.DataSync{ + { + FlagData: "{\"flags\": {}}", + Type: sync.ALL, + }, + }, + }, + } + + for _, test := range tests { + bufCon := bufconn.Listen(5) + + bufServer := bufferedServer{ + listener: bufCon, + mockResponses: test.input, + } + + // start server + go serve(&bufServer) + + grpcSync := Sync{ + Target: target, + ProviderID: "", + Logger: logger.NewLogger(nil, false), + } + + // initialize client + dial, err := grpc.Dial(target, + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return bufCon.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("Error setting up client connection: %s", err.Error()) + } + + serviceClient := syncv1grpc.NewFlagSyncServiceClient(dial) + syncClient, err := serviceClient.SyncFlags(context.Background(), &v1.SyncFlagsRequest{ProviderId: grpcSync.ProviderID}) + if err != nil { + t.Errorf("Error opening client stream: %s", err.Error()) + } + + syncChan := make(chan sync.DataSync, 1) + + // listen to stream + go func() { + err := grpcSync.handleFlagSync(syncClient, syncChan) + if err != nil { + // must ignore EOF as this is returned for stream end + if err != io.EOF { + t.Errorf("Error from stream listener: %s", err.Error()) + } + } + }() + + for _, expected := range test.output { + out := <-syncChan + + if expected.Type != out.Type { + t.Errorf("Returned sync type = %v, wanted %v", out.Type, expected.Type) + } + + if expected.FlagData != out.FlagData { + t.Errorf("Returned sync data = %v, wanted %v", out.FlagData, expected.FlagData) + } + } + + // channel must be empty + if len(syncChan) != 0 { + t.Errorf("Data sync channel must be empty after all test syncs. But received non empty: %d", len(syncChan)) + } + } +} + +// Mock implementations + +type SimpleRecvMock struct { + grpc.ClientStream + mockResponse v1.SyncFlagsResponse +} + +func (s *SimpleRecvMock) Recv() (*v1.SyncFlagsResponse, error) { + return &s.mockResponse, nil +} + +// serve serves a bufferedServer +func serve(bServer *bufferedServer) { + server := grpc.NewServer() + + syncv1grpc.RegisterFlagSyncServiceServer(server, bServer) + + if err := server.Serve(bServer.listener); err != nil { + log.Fatalf("Server exited with error: %v", err) + } +} + +type serverPayload struct { + flags string + state v1.SyncState +} + +// bufferedServer - a mock grpc service backed by buffered connection +type bufferedServer struct { + listener *bufconn.Listener + mockResponses []serverPayload +} + +func (b *bufferedServer) SyncFlags(req *v1.SyncFlagsRequest, stream syncv1grpc.FlagSyncService_SyncFlagsServer) error { + for _, response := range b.mockResponses { + err := stream.Send(&v1.SyncFlagsResponse{ + FlagConfiguration: response.flags, + State: response.state, + }) + if err != nil { + fmt.Printf("Error with stream: %s", err.Error()) + return err + } + } + + return nil +}