Skip to content

Commit

Permalink
wire startup to grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan committed Jan 31, 2023
1 parent 2e25094 commit 9e10b84
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func init() {
"a", nil, "Sync provider arguments as key values separated by =")
flags.StringSliceP(
uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+
"url or FeatureFlagConfiguration. Using multiple providers is supported however if"+
"url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if"+
" flag keys are duplicated across multiple sources it may lead to unexpected behavior. "+
"Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.",
)
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flagd start [flags]
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
-y, --sync-provider string DEPRECATED: Set a sync provider e.g. filepath or remote
-a, --sync-provider-args stringToString Sync provider arguments as key values separated by = (default [])
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath,url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if flag keys are duplicated across multiple sources it may lead to unexpected behavior. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
```

### Options inherited from parent commands
Expand Down
17 changes: 14 additions & 3 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"regexp"
"time"

"github.com/open-feature/flagd/pkg/sync/file"
httpSync "github.com/open-feature/flagd/pkg/sync/http"

"github.com/open-feature/flagd/pkg/eval"
"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/service"
"github.com/open-feature/flagd/pkg/sync"
"github.com/open-feature/flagd/pkg/sync/file"
"github.com/open-feature/flagd/pkg/sync/grpc"
httpSync "github.com/open-feature/flagd/pkg/sync/http"
"github.com/open-feature/flagd/pkg/sync/kubernetes"
"github.com/robfig/cron"
"go.uber.org/zap"
Expand All @@ -21,12 +21,14 @@ import (
var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regFile *regexp.Regexp
)

func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regFile = regexp.MustCompile("^file:")
}

Expand Down Expand Up @@ -99,6 +101,15 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
Cron: cron.New(),
})
rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri))
case regGRPC.Match(uriB):

r.SyncImpl = append(r.SyncImpl, &grpc.Sync{
Target: grpc.URLToGRPCTarget(uri),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
})
default:
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', or 'core.openfeature.dev'",
uri)
Expand Down
14 changes: 1 addition & 13 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
msync "sync"
"syscall"

"github.com/open-feature/flagd/pkg/sync/grpc"
"go.uber.org/zap"

"golang.org/x/sync/errgroup"

"github.com/open-feature/flagd/pkg/eval"
Expand Down Expand Up @@ -38,6 +35,7 @@ type Config struct {

ProviderArgs sync.ProviderArgs
SyncURI []string
RemoteSyncType string
SyncBearerToken string

CORS []string
Expand Down Expand Up @@ -72,16 +70,6 @@ func (r *Runtime) Start() error {
}
})

// todo - get this from configurations
r.SyncImpl = append(r.SyncImpl, &grpc.Sync{
URI: "localhost:8090",
Key: "local",
Logger: r.Logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
})

// Start sync providers
for _, s := range r.SyncImpl {
p := s
Expand Down
31 changes: 23 additions & 8 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"strings"

"buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc"
v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1"
Expand All @@ -15,15 +16,18 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate
// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
const Prefix = "grpc://"

type Sync struct {
URI string
Target string
Key string
Logger *logger.Logger
}

func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// todo - Add certificates and/or tokens
dial, err := grpc.Dial("localhost:8090", grpc.WithTransportCredentials(insecure.NewCredentials()))
dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error()))
return err
Expand All @@ -47,7 +51,6 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {

err = group.Wait()
if err == io.EOF {
// todo - we can retry connection if this happens
g.Logger.Info("Stream closed by the server. Exiting without retry attempts.")
return err
}
Expand All @@ -67,7 +70,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_ALL:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.ALL,
}

Expand All @@ -76,7 +79,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_ADD:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.ADD,
}

Expand All @@ -85,7 +88,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_UPDATE:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.UPDATE,
}

Expand All @@ -94,7 +97,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_DELETE:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.DELETE,
}

Expand All @@ -107,3 +110,15 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
}
}
}

// URLToGRPCTarget is a helper to derive GRPC target from a provided URL
// For example, function returns the target localhost:9090 for the input grpc://localhost:9090
func URLToGRPCTarget(url string) string {
index := strings.Split(url, Prefix)

if len(index) == 2 {
return index[1]
}

return index[0]
}
25 changes: 18 additions & 7 deletions pkg/sync/grpc/grpc_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,31 @@ package grpc
import "testing"

func TestUrlToGRPCTarget(t *testing.T) {
type args struct {
url string
}
tests := []struct {
name string
args args
url string
want string
}{
// TODO: Add test cases.
{
name: "With Prefix",
url: "grpc://test.com/endpoint",
want: "test.com/endpoint",
},
{
name: "Without Prefix",
url: "test.com/endpoint",
want: "test.com/endpoint",
},
{
name: "Empty is empty",
url: "",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := UrlToGRPCTarget(tt.args.url); got != tt.want {
t.Errorf("UrlToGRPCTarget() = %v, want %v", got, tt.want)
if got := URLToGRPCTarget(tt.url); got != tt.want {
t.Errorf("URLToGRPCTarget() = %v, want %v", got, tt.want)
}
})
}
Expand Down

0 comments on commit 9e10b84

Please sign in to comment.