Skip to content

Commit

Permalink
wire startup to grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan committed Jan 24, 2023
1 parent d6f9108 commit 7458bb6
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() {
"a", nil, "Sync provider arguments as key values separated by =")
flags.StringSliceP(
uriFlagName, "f", []string{}, "Set a sync provider uri to read data from, this can be a filepath,"+
"url or FeatureFlagConfiguration. Using multiple providers is supported however if"+
"url (http and grpc) or FeatureFlagConfiguration. Using multiple providers is supported however if"+
" flag keys are duplicated across multiple sources it may lead to unexpected behavior. "+
"Please note that if you are using filepath, flagd only supports files with `.yaml/.yml/.json` extension.",
)
Expand Down
17 changes: 14 additions & 3 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"regexp"
"time"

"github.com/open-feature/flagd/pkg/sync/file"
httpSync "github.com/open-feature/flagd/pkg/sync/http"

"github.com/open-feature/flagd/pkg/eval"
"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/service"
"github.com/open-feature/flagd/pkg/sync"
"github.com/open-feature/flagd/pkg/sync/file"
"github.com/open-feature/flagd/pkg/sync/grpc"
httpSync "github.com/open-feature/flagd/pkg/sync/http"
"github.com/open-feature/flagd/pkg/sync/kubernetes"
"github.com/robfig/cron"
"go.uber.org/zap"
Expand All @@ -21,12 +21,14 @@ import (
var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regFile *regexp.Regexp
)

func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regFile = regexp.MustCompile("^file:")
}

Expand Down Expand Up @@ -99,6 +101,15 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
Cron: cron.New(),
})
rtLogger.Debug(fmt.Sprintf("Using remote sync-provider for %q", uri))
case regGRPC.Match(uriB):

r.SyncImpl = append(r.SyncImpl, &grpc.Sync{
Target: grpc.URLToGRPCTarget(uri),
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
})
default:
return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', or 'core.openfeature.dev'",
uri)
Expand Down
14 changes: 1 addition & 13 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
msync "sync"
"syscall"

"github.com/open-feature/flagd/pkg/sync/grpc"
"go.uber.org/zap"

"golang.org/x/sync/errgroup"

"github.com/open-feature/flagd/pkg/eval"
Expand Down Expand Up @@ -38,6 +35,7 @@ type Config struct {

ProviderArgs sync.ProviderArgs
SyncURI []string
RemoteSyncType string
SyncBearerToken string

CORS []string
Expand Down Expand Up @@ -72,16 +70,6 @@ func (r *Runtime) Start() error {
}
})

// todo - get this from configurations
r.SyncImpl = append(r.SyncImpl, &grpc.Sync{
URI: "localhost:8090",
Key: "local",
Logger: r.Logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
})

// Start sync providers
for _, s := range r.SyncImpl {
p := s
Expand Down
31 changes: 23 additions & 8 deletions pkg/sync/grpc/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"strings"

"buf.build/gen/go/kavindudodan/flagd/grpc/go/sync/v1/servicev1grpc"
v1 "buf.build/gen/go/kavindudodan/flagd/protocolbuffers/go/sync/v1"
Expand All @@ -15,15 +16,18 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

// Prefix for GRPC URL inputs. GRPC does not define a prefix through standard. This prefix helps to differentiate
// remote URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
const Prefix = "grpc://"

type Sync struct {
URI string
Target string
Key string
Logger *logger.Logger
}

func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// todo - Add certificates and/or tokens
dial, err := grpc.Dial("localhost:8090", grpc.WithTransportCredentials(insecure.NewCredentials()))
dial, err := grpc.Dial(g.Target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
g.Logger.Error(fmt.Sprintf("Error establishing connection: %s", err.Error()))
return err
Expand All @@ -47,7 +51,6 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {

err = group.Wait()
if err == io.EOF {
// todo - we can retry connection if this happens
g.Logger.Info("Stream closed by the server. Exiting without retry attempts.")
return err
}
Expand All @@ -67,7 +70,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_ALL:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.ALL,
}

Expand All @@ -76,7 +79,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_ADD:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.ADD,
}

Expand All @@ -85,7 +88,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_UPDATE:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.UPDATE,
}

Expand All @@ -94,7 +97,7 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
case v1.SyncState_SYNC_STATE_DELETE:
dataSync <- sync.DataSync{
FlagData: data.Flags,
Source: g.URI,
Source: g.Target,
Type: sync.DELETE,
}

Expand All @@ -107,3 +110,15 @@ func (g *Sync) streamHandler(stream servicev1grpc.FlagService_SyncFlagsClient, d
}
}
}

// URLToGRPCTarget is a helper to derive GRPC target from a provided URL
// For example, function returns the target localhost:9090 for the input grpc://localhost:9090
func URLToGRPCTarget(url string) string {
index := strings.Split(url, Prefix)

if len(index) == 2 {
return index[1]
}

return index[0]
}
25 changes: 18 additions & 7 deletions pkg/sync/grpc/grpc_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,31 @@ package grpc
import "testing"

func TestUrlToGRPCTarget(t *testing.T) {
type args struct {
url string
}
tests := []struct {
name string
args args
url string
want string
}{
// TODO: Add test cases.
{
name: "With Prefix",
url: "grpc://test.com/endpoint",
want: "test.com/endpoint",
},
{
name: "Without Prefix",
url: "test.com/endpoint",
want: "test.com/endpoint",
},
{
name: "Empty is empty",
url: "",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := UrlToGRPCTarget(tt.args.url); got != tt.want {
t.Errorf("UrlToGRPCTarget() = %v, want %v", got, tt.want)
if got := URLToGRPCTarget(tt.url); got != tt.want {
t.Errorf("URLToGRPCTarget() = %v, want %v", got, tt.want)
}
})
}
Expand Down

0 comments on commit 7458bb6

Please sign in to comment.