diff --git a/cmd/relayproxy/config/config.go b/cmd/relayproxy/config/config.go
index 73f7aa48245..5e955f5399f 100644
--- a/cmd/relayproxy/config/config.go
+++ b/cmd/relayproxy/config/config.go
@@ -256,6 +256,9 @@ type Config struct {
// Exporter is the configuration on how to export data
Exporter *ExporterConf `mapstructure:"exporter" koanf:"exporter"`
+ // Exporters is the exact same things than Exporter but allows to give more than 1 exporter at the time.
+ Exporters *[]ExporterConf `mapstructure:"exporters" koanf:"exporters"`
+
// Notifiers is the configuration on where to notify a flag change
Notifiers []NotifierConf `mapstructure:"notifier" koanf:"notifier"`
@@ -414,6 +417,28 @@ func (c *Config) IsValid() error {
return fmt.Errorf("invalid port %d", c.ListenPort)
}
+ if err := c.validateRetrievers(); err != nil {
+ return err
+ }
+
+ if err := c.validateExporters(); err != nil {
+ return err
+ }
+
+ if err := c.validateNotifiers(); err != nil {
+ return err
+ }
+
+ if c.LogLevel != "" {
+ if _, err := zapcore.ParseLevel(c.LogLevel); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (c *Config) validateRetrievers() error {
if c.Retriever == nil && c.Retrievers == nil {
return fmt.Errorf("no retriever available in the configuration")
}
@@ -432,13 +457,28 @@ func (c *Config) IsValid() error {
}
}
- // Exporter is optional
+ return nil
+}
+
+func (c *Config) validateExporters() error {
if c.Exporter != nil {
if err := c.Exporter.IsValid(); err != nil {
return err
}
}
+ if c.Exporters != nil {
+ for _, exporter := range *c.Exporters {
+ if err := exporter.IsValid(); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func (c *Config) validateNotifiers() error {
if c.Notifiers != nil {
for _, notif := range c.Notifiers {
if err := notif.IsValid(); err != nil {
diff --git a/cmd/relayproxy/config/config_test.go b/cmd/relayproxy/config/config_test.go
index d92d3f52b6f..1faa9d8e171 100644
--- a/cmd/relayproxy/config/config_test.go
+++ b/cmd/relayproxy/config/config_test.go
@@ -87,6 +87,84 @@ func TestParseConfig_fileFromPflag(t *testing.T) {
},
wantErr: assert.NoError,
},
+ {
+ name: "Valid yaml file with multiple exporters",
+ fileLocation: "../testdata/config/valid-yaml-multiple-exporters.yaml",
+ want: &config.Config{
+ ListenPort: 1031,
+ PollingInterval: 1000,
+ FileFormat: "yaml",
+ Host: "localhost",
+ Retriever: &config.RetrieverConf{
+ Kind: "http",
+ URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml",
+ },
+ Exporters: &[]config.ExporterConf{
+ {
+ Kind: "log",
+ },
+ {
+ Kind: "file",
+ OutputDir: "./",
+ },
+ },
+ StartWithRetrieverError: false,
+ Version: "1.X.X",
+ EnableSwagger: true,
+ AuthorizedKeys: config.APIKeys{
+ Admin: []string{
+ "apikey3",
+ },
+ Evaluation: []string{
+ "apikey1",
+ "apikey2",
+ },
+ },
+ LogLevel: "info",
+ },
+ wantErr: assert.NoError,
+ },
+ {
+ name: "Valid yaml file with both exporter and exporters",
+ fileLocation: "../testdata/config/valid-yaml-exporter-and-exporters.yaml",
+ want: &config.Config{
+ ListenPort: 1031,
+ PollingInterval: 1000,
+ FileFormat: "yaml",
+ Host: "localhost",
+ Retriever: &config.RetrieverConf{
+ Kind: "http",
+ URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml",
+ },
+ Exporter: &config.ExporterConf{
+ Kind: "log",
+ },
+ Exporters: &[]config.ExporterConf{
+ {
+ Kind: "webhook",
+ EndpointURL: "https://example.com/webhook",
+ },
+ {
+ Kind: "file",
+ OutputDir: "./",
+ },
+ },
+ StartWithRetrieverError: false,
+ Version: "1.X.X",
+ EnableSwagger: true,
+ AuthorizedKeys: config.APIKeys{
+ Admin: []string{
+ "apikey3",
+ },
+ Evaluation: []string{
+ "apikey1",
+ "apikey2",
+ },
+ },
+ LogLevel: "info",
+ },
+ wantErr: assert.NoError,
+ },
{
name: "Valid json file",
fileLocation: "../testdata/config/valid-file.json",
@@ -345,6 +423,7 @@ func TestConfig_IsValid(t *testing.T) {
Retriever *config.RetrieverConf
Retrievers *[]config.RetrieverConf
Exporter *config.ExporterConf
+ Exporters *[]config.ExporterConf
Notifiers []config.NotifierConf
LogLevel string
Debug bool
@@ -477,6 +556,26 @@ func TestConfig_IsValid(t *testing.T) {
},
wantErr: assert.Error,
},
+ {
+ name: "invalid exporter in the list of exporters",
+ fields: fields{
+ ListenPort: 8080,
+ Retriever: &config.RetrieverConf{
+ Kind: "file",
+ Path: "../testdata/config/valid-file.yaml",
+ },
+ Exporters: &[]config.ExporterConf{
+ {
+ Kind: "webhook",
+ EndpointURL: "https://example.com/webhook",
+ },
+ {
+ Kind: "file",
+ },
+ },
+ },
+ wantErr: assert.Error,
+ },
{
name: "invalid notifier",
fields: fields{
@@ -544,6 +643,7 @@ func TestConfig_IsValid(t *testing.T) {
StartWithRetrieverError: tt.fields.StartWithRetrieverError,
Retriever: tt.fields.Retriever,
Exporter: tt.fields.Exporter,
+ Exporters: tt.fields.Exporters,
Notifiers: tt.fields.Notifiers,
Retrievers: tt.fields.Retrievers,
LogLevel: tt.fields.LogLevel,
diff --git a/cmd/relayproxy/service/gofeatureflag.go b/cmd/relayproxy/service/gofeatureflag.go
index 1f3702d98f3..a40ed5b3777 100644
--- a/cmd/relayproxy/service/gofeatureflag.go
+++ b/cmd/relayproxy/service/gofeatureflag.go
@@ -47,63 +47,106 @@ func NewGoFeatureFlagClient(
logger *zap.Logger,
notifiers []notifier.Notifier,
) (*ffclient.GoFeatureFlag, error) {
- var mainRetriever retriever.Retriever
- var err error
-
if proxyConf == nil {
return nil, fmt.Errorf("proxy config is empty")
}
+ mainRetriever, retrievers, err := initRetrievers(proxyConf)
+ if err != nil {
+ return nil, err
+ }
+
+ mainDataExporter, dataExporters, err := initExporters(proxyConf)
+ if err != nil {
+ return nil, err
+ }
+
+ notif, err := initNotifiers(proxyConf.Notifiers, notifiers)
+ if err != nil {
+ return nil, err
+ }
+
+ f := ffclient.Config{
+ PollingInterval: time.Duration(proxyConf.PollingInterval) * time.Millisecond,
+ LeveledLogger: slog.New(slogzap.Option{Level: slog.LevelDebug, Logger: logger}.NewZapHandler()),
+ Context: context.Background(),
+ Retriever: mainRetriever,
+ Retrievers: retrievers,
+ Notifiers: notif,
+ FileFormat: proxyConf.FileFormat,
+ DataExporter: mainDataExporter,
+ DataExporters: dataExporters,
+ StartWithRetrieverError: proxyConf.StartWithRetrieverError,
+ EnablePollingJitter: proxyConf.EnablePollingJitter,
+ DisableNotifierOnInit: proxyConf.DisableNotifierOnInit,
+ EvaluationContextEnrichment: proxyConf.EvaluationContextEnrichment,
+ PersistentFlagConfigurationFile: proxyConf.PersistentFlagConfigurationFile,
+ }
+
+ return ffclient.New(f)
+}
+
+func initRetrievers(proxyConf *config.Config) (retriever.Retriever, []retriever.Retriever, error) {
+ var mainRetriever retriever.Retriever
+ var err error
+
if proxyConf.Retriever != nil {
mainRetriever, err = initRetriever(proxyConf.Retriever)
if err != nil {
- return nil, err
+ return nil, nil, err
}
}
- // Manage if we have more than 1 retriever
retrievers := make([]retriever.Retriever, 0)
if proxyConf.Retrievers != nil {
for _, r := range *proxyConf.Retrievers {
currentRetriever, err := initRetriever(&r)
if err != nil {
- return nil, err
+ return nil, nil, err
}
retrievers = append(retrievers, currentRetriever)
}
}
- var exp ffclient.DataExporter
+ return mainRetriever, retrievers, nil
+}
+
+func initExporters(proxyConf *config.Config) (ffclient.DataExporter, []ffclient.DataExporter, error) {
+ var mainDataExporter ffclient.DataExporter
+ var err error
+
if proxyConf.Exporter != nil {
- exp, err = initDataExporter(proxyConf.Exporter)
+ mainDataExporter, err = initDataExporter(proxyConf.Exporter)
if err != nil {
- return nil, err
+ return ffclient.DataExporter{}, nil, err
}
}
- notif, err := initNotifier(proxyConf.Notifiers)
- if err != nil {
- return nil, err
+ if proxyConf.Exporters == nil {
+ return mainDataExporter, nil, nil
}
- notif = append(notif, notifiers...)
- f := ffclient.Config{
- PollingInterval: time.Duration(proxyConf.PollingInterval) * time.Millisecond,
- LeveledLogger: slog.New(slogzap.Option{Level: slog.LevelDebug, Logger: logger}.NewZapHandler()),
- Context: context.Background(),
- Retriever: mainRetriever,
- Retrievers: retrievers,
- Notifiers: notif,
- FileFormat: proxyConf.FileFormat,
- DataExporter: exp,
- StartWithRetrieverError: proxyConf.StartWithRetrieverError,
- EnablePollingJitter: proxyConf.EnablePollingJitter,
- DisableNotifierOnInit: proxyConf.DisableNotifierOnInit,
- EvaluationContextEnrichment: proxyConf.EvaluationContextEnrichment,
- PersistentFlagConfigurationFile: proxyConf.PersistentFlagConfigurationFile,
+ // Initialize each exporter with its own configuration
+ dataExporters := make([]ffclient.DataExporter, len(*proxyConf.Exporters))
+ for i, e := range *proxyConf.Exporters {
+ dataExporters[i], err = initDataExporter(&e)
+ if err != nil {
+ return ffclient.DataExporter{}, nil, err
+ }
}
- return ffclient.New(f)
+ return mainDataExporter, dataExporters, nil
+}
+
+func initNotifiers(
+ configNotifiers []config.NotifierConf,
+ additionalNotifiers []notifier.Notifier,
+) ([]notifier.Notifier, error) {
+ notif, err := initNotifier(configNotifiers)
+ if err != nil {
+ return nil, err
+ }
+ return append(notif, additionalNotifiers...), nil
}
// initRetriever initialize the retriever based on the configuration
diff --git a/cmd/relayproxy/service/gofeatureflag_test.go b/cmd/relayproxy/service/gofeatureflag_test.go
index bf4fb48450f..bfe84369683 100644
--- a/cmd/relayproxy/service/gofeatureflag_test.go
+++ b/cmd/relayproxy/service/gofeatureflag_test.go
@@ -537,3 +537,85 @@ func TestNewGoFeatureFlagClient_ProxyConfNil(t *testing.T) {
assert.Nil(t, goff, "Expected GoFeatureFlag client to be nil when proxyConf is nil")
assert.EqualError(t, err, "proxy config is empty", "Expected error message to indicate empty proxy config")
}
+
+func TestNewGoFeatureFlagClient(t *testing.T) {
+ // Create a logger for testing
+ logger := zap.NewNop()
+
+ tests := []struct {
+ name string
+ proxyConf *config.Config
+ notifiers []notifier.Notifier
+ wantErr bool
+ }{
+ {
+ name: "Valid configuration with HTTP retriever and webhook exporter",
+ proxyConf: &config.Config{
+ ListenPort: 8080,
+ PollingInterval: 50000,
+ FileFormat: "yaml",
+ Retriever: &config.RetrieverConf{
+ Kind: "http",
+ URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml",
+ },
+ Exporter: &config.ExporterConf{
+ Kind: "webhook",
+ EndpointURL: "https://example.com/webhook",
+ Secret: "secret123",
+ },
+ },
+ notifiers: nil,
+ wantErr: false,
+ },
+ {
+ name: "Valid configuration with multiple retrievers and exporters",
+ proxyConf: &config.Config{
+ ListenPort: 8080,
+ PollingInterval: 60000,
+ FileFormat: "yaml",
+ Retrievers: &[]config.RetrieverConf{
+ {
+ Kind: "http",
+ URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml",
+ },
+ },
+ Exporters: &[]config.ExporterConf{
+ {
+ Kind: "log",
+ },
+ },
+ },
+ notifiers: nil,
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ client, err := NewGoFeatureFlagClient(tt.proxyConf, logger, tt.notifiers)
+
+ if tt.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, client)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, client)
+
+ // Additional checks on the client configuration
+ assert.Equal(t, int64(tt.proxyConf.PollingInterval), client.GetPollingInterval())
+
+ // Check if the client is not offline
+ assert.False(t, client.IsOffline())
+
+ // Force a refresh and check if it succeeds
+ assert.True(t, client.ForceRefresh())
+
+ // Check if the cache refresh date is recent
+ assert.WithinDuration(t, time.Now(), client.GetCacheRefreshDate(), 5*time.Second)
+
+ // Clean up
+ client.Close()
+ }
+ })
+ }
+}
diff --git a/cmd/relayproxy/testdata/config/valid-yaml-exporter-and-exporters.yaml b/cmd/relayproxy/testdata/config/valid-yaml-exporter-and-exporters.yaml
new file mode 100644
index 00000000000..7c3658bb82a
--- /dev/null
+++ b/cmd/relayproxy/testdata/config/valid-yaml-exporter-and-exporters.yaml
@@ -0,0 +1,21 @@
+listen: 1031
+pollingInterval: 1000
+startWithRetrieverError: false
+retriever:
+ kind: http
+ url: https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml
+exporter:
+ kind: log
+exporters:
+ - kind: webhook
+ endpointURL: https://example.com/webhook
+ - kind: file
+ outputDir: ./
+enableSwagger: true
+authorizedKeys:
+ evaluation:
+ - apikey1 # owner: userID1
+ - apikey2 # owner: userID2
+ admin:
+ - apikey3
+loglevel: info
diff --git a/cmd/relayproxy/testdata/config/valid-yaml-multiple-exporters.yaml b/cmd/relayproxy/testdata/config/valid-yaml-multiple-exporters.yaml
new file mode 100644
index 00000000000..084da54fa8f
--- /dev/null
+++ b/cmd/relayproxy/testdata/config/valid-yaml-multiple-exporters.yaml
@@ -0,0 +1,18 @@
+listen: 1031
+pollingInterval: 1000
+startWithRetrieverError: false
+retriever:
+ kind: http
+ url: https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml
+exporters:
+ - kind: log
+ - kind: file
+ outputDir: ./
+enableSwagger: true
+authorizedKeys:
+ evaluation:
+ - apikey1 # owner: userID1
+ - apikey2 # owner: userID2
+ admin:
+ - apikey3
+loglevel: info
diff --git a/config.go b/config.go
index 159daf8617c..0f35db5f7d6 100644
--- a/config.go
+++ b/config.go
@@ -74,6 +74,10 @@ type Config struct {
// DataExporter (optional) is the configuration where we store how we should output the flags variations results
DataExporter DataExporter
+ // DataExporters (optional) are configurations where we store how to output the flags variations results
+ // Multiple exporters can be used to send data to multiple destinations in parallel without interference.
+ DataExporters []DataExporter
+
// StartWithRetrieverError (optional) If true, the SDK will start even if we did not get any flags from the retriever.
// It will serve only default values until all the retrievers returns the flags.
// The init method will not return any error if the flag file is unreachable.
@@ -126,6 +130,20 @@ func (c *Config) GetRetrievers() ([]retriever.Retriever, error) {
return retrievers, nil
}
+// GetDataExporters returns the list of DataExporter configured.
+func (c *Config) GetDataExporters() []DataExporter {
+ dataExporters := make([]DataExporter, 0)
+ // If we have both DataExporter and DataExporters fields configured, we are first looking at what is available
+ // in DataExporter before looking at what is in DataExporters.
+ if c.DataExporter != (DataExporter{}) {
+ dataExporters = append(dataExporters, c.DataExporter)
+ }
+ if len(c.DataExporters) > 0 {
+ dataExporters = append(dataExporters, c.DataExporters...)
+ }
+ return dataExporters
+}
+
// SetOffline set GO Feature Flag in offline mode.
func (c *Config) SetOffline(control bool) {
if c.offlineMutex == nil {
diff --git a/config_test.go b/config_test.go
index af2b40538c1..019070bf76b 100644
--- a/config_test.go
+++ b/config_test.go
@@ -114,6 +114,108 @@ func TestConfig_GetRetrievers(t *testing.T) {
}
}
+func TestConfig_GetDataExporters(t *testing.T) {
+ type fields struct {
+ DataExporter ffClient.DataExporter
+ DataExporters []ffClient.DataExporter
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want []ffClient.DataExporter
+ }{
+ {
+ name: "No data exporter",
+ fields: fields{},
+ want: []ffClient.DataExporter{},
+ },
+ {
+ name: "Single data exporter",
+ fields: fields{
+ DataExporter: ffClient.DataExporter{
+ FlushInterval: 10 * time.Second,
+ MaxEventInMemory: 100,
+ },
+ },
+ want: []ffClient.DataExporter{
+ {
+ FlushInterval: 10 * time.Second,
+ MaxEventInMemory: 100,
+ },
+ },
+ },
+ {
+ name: "Multiple data exporters",
+ fields: fields{
+ DataExporters: []ffClient.DataExporter{
+ {
+ FlushInterval: 20 * time.Second,
+ MaxEventInMemory: 200,
+ },
+ {
+ FlushInterval: 30 * time.Second,
+ MaxEventInMemory: 300,
+ },
+ },
+ },
+ want: []ffClient.DataExporter{
+ {
+ FlushInterval: 20 * time.Second,
+ MaxEventInMemory: 200,
+ },
+ {
+ FlushInterval: 30 * time.Second,
+ MaxEventInMemory: 300,
+ },
+ },
+ },
+ {
+ name: "Both single and multiple data exporters",
+ fields: fields{
+ DataExporter: ffClient.DataExporter{
+ FlushInterval: 10 * time.Second,
+ MaxEventInMemory: 100,
+ },
+ DataExporters: []ffClient.DataExporter{
+ {
+ FlushInterval: 20 * time.Second,
+ MaxEventInMemory: 200,
+ },
+ {
+ FlushInterval: 30 * time.Second,
+ MaxEventInMemory: 300,
+ },
+ },
+ },
+ want: []ffClient.DataExporter{
+ {
+ FlushInterval: 10 * time.Second,
+ MaxEventInMemory: 100,
+ },
+ {
+ FlushInterval: 20 * time.Second,
+ MaxEventInMemory: 200,
+ },
+ {
+ FlushInterval: 30 * time.Second,
+ MaxEventInMemory: 300,
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := &ffClient.Config{
+ DataExporter: tt.fields.DataExporter,
+ DataExporters: tt.fields.DataExporters,
+ }
+ got := c.GetDataExporters()
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
func TestOfflineConfig(t *testing.T) {
c := ffClient.Config{
Offline: true,
diff --git a/examples/data_export_log_and_file/flags.goff.yaml b/examples/data_export_log_and_file/flags.goff.yaml
new file mode 100644
index 00000000000..b0783257a9d
--- /dev/null
+++ b/examples/data_export_log_and_file/flags.goff.yaml
@@ -0,0 +1,22 @@
+new-admin-access:
+ variations:
+ default_var: false
+ false_var: false
+ true_var: true
+ defaultRule:
+ percentage:
+ false_var: 70
+ true_var: 30
+
+flag-only-for-admin:
+ variations:
+ default_var: false
+ false_var: false
+ true_var: true
+ targeting:
+ - query: admin eq true
+ percentage:
+ false_var: 0
+ true_var: 100
+ defaultRule:
+ variation: default_var
diff --git a/examples/data_export_log_and_file/main.go b/examples/data_export_log_and_file/main.go
new file mode 100644
index 00000000000..f67cbbcf761
--- /dev/null
+++ b/examples/data_export_log_and_file/main.go
@@ -0,0 +1,166 @@
+package main
+
+import (
+ "context"
+ "log"
+ "log/slog"
+ "time"
+
+ "github.com/thomaspoignant/go-feature-flag/ffcontext"
+
+ "github.com/thomaspoignant/go-feature-flag/exporter/fileexporter"
+ "github.com/thomaspoignant/go-feature-flag/exporter/logsexporter"
+ "github.com/thomaspoignant/go-feature-flag/retriever/fileretriever"
+
+ ffclient "github.com/thomaspoignant/go-feature-flag"
+)
+
+func main() {
+ // Init ffclient with multiple exporters
+ err := ffclient.Init(ffclient.Config{
+ PollingInterval: 10 * time.Second,
+ LeveledLogger: slog.Default(),
+ Context: context.Background(),
+ Retriever: &fileretriever.Retriever{
+ Path: "examples/data_export_log_and_file/flags.goff.yaml",
+ },
+ // Main exporter (bulk) - file exporter with small buffer and short interval
+ DataExporter: ffclient.DataExporter{
+ FlushInterval: 2 * time.Second, // Flush every 2 seconds
+ MaxEventInMemory: 3, // Flush after 3 events
+ Exporter: &fileexporter.Exporter{
+ Format: "json",
+ OutputDir: "./examples/data_export_log_and_file/variation-events/",
+ Filename: "bulk-main-{{ .Timestamp}}.{{ .Format}}",
+ },
+ },
+ // Multiple additional exporters with different configurations
+ DataExporters: []ffclient.DataExporter{
+ {
+ // Bulk exporter with larger buffer and longer interval
+ FlushInterval: 5 * time.Second, // Flush every 5 seconds
+ MaxEventInMemory: 5, // Flush after 5 events
+ Exporter: &fileexporter.Exporter{
+ Format: "json",
+ OutputDir: "./examples/data_export_log_and_file/variation-events/",
+ Filename: "bulk-secondary-{{ .Timestamp}}.{{ .Format}}",
+ },
+ },
+ {
+ // Non-bulk exporter (logs) - should process immediately
+ FlushInterval: 1 * time.Second,
+ MaxEventInMemory: 1,
+ Exporter: &logsexporter.Exporter{
+ LogFormat: "IMMEDIATE - user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ },
+ },
+ {
+ // Another bulk exporter with different settings
+ FlushInterval: 3 * time.Second, // Flush every 3 seconds
+ MaxEventInMemory: 4, // Flush after 4 events
+ Exporter: &fileexporter.Exporter{
+ Format: "json",
+ OutputDir: "./examples/data_export_log_and_file/variation-events/",
+ Filename: "bulk-tertiary-{{ .Timestamp}}.{{ .Format}}",
+ },
+ },
+ },
+ })
+
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer ffclient.Close()
+
+ // Create test users
+ user1 := ffcontext.NewEvaluationContextBuilder("user1").Build()
+ user2 := ffcontext.NewEvaluationContextBuilder("user2").Build()
+ user3 := ffcontext.NewEvaluationContextBuilder("user3").Build()
+
+ // Test scenario to trigger different flush conditions
+
+ log.Println("Phase 1: Generate 3 events")
+ _, _ = ffclient.BoolVariation("new-admin-access", user1, false)
+ _, _ = ffclient.BoolVariation("new-admin-access", user2, false)
+ _, _ = ffclient.BoolVariation("new-admin-access", user3, false)
+
+ log.Println("Waiting 1 second")
+ time.Sleep(1000 * time.Millisecond)
+
+ log.Println("Phase 2: Generate 2 more events")
+ _, _ = ffclient.StringVariation("unknown-flag", user1, "default1")
+ _, _ = ffclient.StringVariation("unknown-flag", user2, "default2")
+
+ log.Println("Waiting 2 seconds...")
+ time.Sleep(2000 * time.Millisecond)
+
+ log.Println("Phase 3: Generate 2 more events")
+ _, _ = ffclient.JSONVariation("json-flag", user1, map[string]interface{}{"test": "value1"})
+ _, _ = ffclient.JSONVariation("json-flag", user2, map[string]interface{}{"test": "value2"})
+
+ log.Println("Waiting 3 seconds...")
+ time.Sleep(3000 * time.Millisecond)
+
+ log.Println("Phase 4: Generate 1 final event")
+ _, _ = ffclient.JSONVariation("json-flag", user3, map[string]interface{}{"test": "value3"})
+
+ log.Println("Waiting 5 seconds...")
+ time.Sleep(5000 * time.Millisecond)
+
+ /*
+ Expected behavior:
+
+ Phase 1 (3 events):
+ - Main exporter: Flushes immediately (hit max 3)
+ - Secondary exporter: Holds events (not yet at max 5)
+ - Tertiary exporter: Holds events (not yet at max 4)
+ - Logger: Processes immediately
+
+ After 1s:
+ - No flushes (intervals not reached)
+
+ Phase 2 (+2 events, total 5):
+ - Main exporter: Holds 2 events (not yet at max 3)
+ - Secondary exporter: Flushes immediately (hit max 5)
+ - Tertiary exporter: Flushes immediately at 4 events and then holds 1 event
+ - Logger: Processes immediately
+
+ After 2s:
+ - Main exporter: Flushes (interval hit)
+ - Secondary exporter: Empty after previous flush
+ - Tertiary exporter: Holds 1 event (not yet at max 4)
+
+ Phase 3 (+2 events, total 2 since last flush):
+ - Main exporter: Holds 2 events (not yet at max 3)
+ - Secondary exporter: Holds 2 events (not yet at max 5)
+ - Tertiary exporter: Holds 3 events (not yet at max 4)
+ - Logger: Processes immediately
+
+ After 3s:
+ - Main exporter: Flushes (interval hit)
+ - Secondary exporter: Flushes (interval hit)
+ - Tertiary exporter: Flushed after only 1 second
+
+ Phase 4 (+1 event, total 3 since last flush):
+ - Main exporter: Holds 1 event (not yet at max 3)
+ - Secondary exporter: Holds 1 event (not yet at max 5)
+ - Tertiary exporter: Holds 1 event (not yet at max 4)
+ - Logger: Processes immediately
+
+ After 5s:
+ - Main exporter: Flushed after only 3 seconds
+ - Secondary exporter: Flushes remaining events (interval hit)
+ - Tertiary exporter: Flushed after only 1 second
+
+ Finally:
+ - All exporters will flush any remaining events on Close()
+
+ Note:
+ - Total we have 8 events
+ - Main exporter will generate 4 files containing 3, 2, 2, 1 events respectively
+ - Secondary exporter will generate 3 files containing 5, 2, 1 events respectively
+ - Tertiary exporter will generate 3 files containing 4, 3, 1 events respectively
+ - Logger will generate 8 events in the logs
+ (format "IMMEDIATE - user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"")
+ */
+}
diff --git a/exporter/data_exporter.go b/exporter/data_exporter.go
index 3038a1d25d7..2c2f3205313 100644
--- a/exporter/data_exporter.go
+++ b/exporter/data_exporter.go
@@ -16,110 +16,221 @@ const (
defaultMaxEventInMemory = int64(100000)
)
-// NewScheduler allows creating a new instance of Scheduler ready to be used to export data.
+// ExporterConfig holds the configuration for an individual exporter
+type Config struct {
+ Exporter CommonExporter
+ FlushInterval time.Duration
+ MaxEventInMemory int64
+}
+
+// ExporterState maintains the state for a single exporter
+type State struct {
+ config Config
+ ticker *time.Ticker
+ lastIndex int // Index of the last processed event
+}
+
+// Scheduler handles data collection for one or more exporters
+type Scheduler struct {
+ sharedCache []FeatureEvent
+ bulkExporters map[CommonExporter]*State // Only bulk exporters that need periodic flushing
+ directExporters []CommonExporter // Non-bulk exporters that flush immediately
+ mutex sync.Mutex
+ daemonChan chan struct{}
+ logger *fflog.FFLogger
+ ctx context.Context
+}
+
+// NewScheduler creates a new scheduler that handles one exporter
func NewScheduler(ctx context.Context, flushInterval time.Duration, maxEventInMemory int64,
exp CommonExporter, logger *fflog.FFLogger,
+) *Scheduler {
+ // Convert single exporter parameters to ExporterConfig
+ config := Config{
+ Exporter: exp,
+ FlushInterval: flushInterval,
+ MaxEventInMemory: maxEventInMemory,
+ }
+ return NewMultiScheduler(ctx, []Config{config}, logger)
+}
+
+// NewMultiScheduler creates a scheduler that handles multiple exporters
+func NewMultiScheduler(ctx context.Context, exporterConfigs []Config, logger *fflog.FFLogger,
) *Scheduler {
if ctx == nil {
ctx = context.Background()
}
- if flushInterval == 0 {
- flushInterval = defaultFlushInterval
- }
+ bulkExporters := make(map[CommonExporter]*State)
+ directExporters := make([]CommonExporter, 0)
- if maxEventInMemory == 0 {
- maxEventInMemory = defaultMaxEventInMemory
+ for _, config := range exporterConfigs {
+ if config.FlushInterval == 0 {
+ config.FlushInterval = defaultFlushInterval
+ }
+ if config.MaxEventInMemory == 0 {
+ config.MaxEventInMemory = defaultMaxEventInMemory
+ }
+
+ if config.Exporter.IsBulk() {
+ state := &State{
+ config: config,
+ lastIndex: -1,
+ ticker: time.NewTicker(config.FlushInterval),
+ }
+ bulkExporters[config.Exporter] = state
+ } else {
+ directExporters = append(directExporters, config.Exporter)
+ }
}
return &Scheduler{
- localCache: make([]FeatureEvent, 0),
+ sharedCache: make([]FeatureEvent, 0),
+ bulkExporters: bulkExporters,
+ directExporters: directExporters,
mutex: sync.Mutex{},
- maxEventInCache: maxEventInMemory,
- exporter: exp,
daemonChan: make(chan struct{}),
- ticker: time.NewTicker(flushInterval),
logger: logger,
ctx: ctx,
}
}
-// Scheduler is the struct that handle the data collection.
-type Scheduler struct {
- localCache []FeatureEvent
- mutex sync.Mutex
- daemonChan chan struct{}
- ticker *time.Ticker
- maxEventInCache int64
- exporter CommonExporter
- logger *fflog.FFLogger
- ctx context.Context
-}
+// AddEvent adds an event to the shared cache and handles immediate export for non-bulk exporters
+func (s *Scheduler) AddEvent(event FeatureEvent) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
-// AddEvent allow adding an event to the local cache and to call the exporter if we reach
-// the maximum number of events that can be present in the cache.
-func (dc *Scheduler) AddEvent(event FeatureEvent) {
- if !dc.exporter.IsBulk() {
- err := sendEvents(dc.ctx, dc.exporter, dc.logger, []FeatureEvent{event})
+ // Handle non-bulk exporters immediately
+ for _, exporter := range s.directExporters {
+ err := sendEvents(s.ctx, exporter, s.logger, []FeatureEvent{event})
if err != nil {
- dc.logger.Error(err.Error())
+ s.logger.Error(err.Error())
+ }
+ }
+
+ // If we have no bulk exporters, we're done
+ if len(s.bulkExporters) == 0 {
+ return
+ }
+
+ // Add event to shared cache for bulk exporters
+ s.sharedCache = append(s.sharedCache, event)
+ currentIndex := len(s.sharedCache) - 1
+
+ // Check if any bulk exporters need to flush due to max events
+ for _, state := range s.bulkExporters {
+ pendingCount := currentIndex - state.lastIndex
+ if state.config.MaxEventInMemory > 0 && int64(pendingCount) >= state.config.MaxEventInMemory {
+ s.flushExporter(state)
}
+ }
+
+ // Clean up events that have been processed by all exporters
+ s.cleanupProcessedEvents()
+}
+
+// getPendingEvents returns events that haven't been processed by this exporter
+func (s *Scheduler) getPendingEvents(state *State) []FeatureEvent {
+ if state.lastIndex+1 >= len(s.sharedCache) {
+ return nil
+ }
+ return s.sharedCache[state.lastIndex+1:]
+}
+
+// flushExporter sends pending events to the specified exporter
+func (s *Scheduler) flushExporter(state *State) {
+ pendingEvents := s.getPendingEvents(state)
+ if len(pendingEvents) == 0 {
+ return
+ }
+
+ err := sendEvents(s.ctx, state.config.Exporter, s.logger, pendingEvents)
+ if err != nil {
+ s.logger.Error(err.Error())
+ return
+ }
+
+ // Update last processed index
+ state.lastIndex = len(s.sharedCache) - 1
+}
+
+// cleanupProcessedEvents removes events that have been processed by all bulk exporters
+func (s *Scheduler) cleanupProcessedEvents() {
+ // If no bulk exporters, we can clear the cache
+ if len(s.bulkExporters) == 0 {
+ s.sharedCache = make([]FeatureEvent, 0)
return
}
- dc.mutex.Lock()
- defer dc.mutex.Unlock()
- if int64(len(dc.localCache)) >= dc.maxEventInCache {
- dc.flush()
+ // Find minimum lastIndex among bulk exporters
+ minIndex := len(s.sharedCache)
+ for _, state := range s.bulkExporters {
+ if state.lastIndex < minIndex {
+ minIndex = state.lastIndex
+ }
+ }
+
+ // If all exporters have processed some events, we can remove them
+ if minIndex > 0 {
+ // Keep events from minIndex+1 onwards
+ s.sharedCache = s.sharedCache[minIndex+1:]
+ // Update lastIndex for all exporters
+ for _, state := range s.bulkExporters {
+ state.lastIndex -= (minIndex + 1)
+ }
}
- dc.localCache = append(dc.localCache, event)
}
-// StartDaemon will start a goroutine to check every X seconds if we should send the data.
-// The daemon is started only if we have a bulk exporter.
-func (dc *Scheduler) StartDaemon() {
+// StartDaemon starts the periodic flush for bulk exporters
+func (s *Scheduler) StartDaemon() {
+ // If no bulk exporters, no need for daemon
+ if len(s.bulkExporters) == 0 {
+ return
+ }
+
for {
select {
- case <-dc.ticker.C:
- // send data and clear local cache
- dc.mutex.Lock()
- dc.flush()
- dc.mutex.Unlock()
- case <-dc.daemonChan:
- // stop the daemon
+ case <-s.daemonChan:
return
+ default:
+ s.mutex.Lock()
+ for _, state := range s.bulkExporters {
+ select {
+ case <-state.ticker.C:
+ s.flushExporter(state)
+ default:
+ // Continue if this ticker hasn't triggered
+ }
+ }
+ s.cleanupProcessedEvents()
+ s.mutex.Unlock()
+ // Small sleep to prevent busy waiting
+ time.Sleep(100 * time.Millisecond)
}
}
}
-// Close will stop the daemon and send the data still in the cache
-func (dc *Scheduler) Close() {
- // Close the daemon
- dc.ticker.Stop()
- close(dc.daemonChan)
-
- // Send the data still in the cache
- dc.mutex.Lock()
- dc.flush()
- dc.mutex.Unlock()
-}
+// Close stops all tickers and flushes remaining events
+func (s *Scheduler) Close() {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
-// GetLogger will return the logger used by the scheduler
-func (dc *Scheduler) GetLogger(level slog.Level) *log.Logger {
- if dc.logger == nil {
- return nil
+ // Stop all tickers and flush bulk exporters
+ for _, state := range s.bulkExporters {
+ state.ticker.Stop()
+ s.flushExporter(state)
}
- return dc.logger.GetLogLogger(level)
+
+ close(s.daemonChan)
+ s.sharedCache = nil
}
-// flush will call the data exporter and clear the cache
-func (dc *Scheduler) flush() {
- err := sendEvents(dc.ctx, dc.exporter, dc.logger, dc.localCache)
- if err != nil {
- dc.logger.Error(err.Error())
- return
+// GetLogger returns the logger used by the scheduler
+func (s *Scheduler) GetLogger(level slog.Level) *log.Logger {
+ if s.logger == nil {
+ return nil
}
- dc.localCache = make([]FeatureEvent, 0)
+ return s.logger.GetLogLogger(level)
}
func sendEvents(ctx context.Context, exporter CommonExporter, logger *fflog.FFLogger, events []FeatureEvent) error {
diff --git a/feature_flag.go b/feature_flag.go
index ac14952eeaf..70083f542c4 100644
--- a/feature_flag.go
+++ b/feature_flag.go
@@ -43,32 +43,29 @@ func Init(config Config) error {
// GoFeatureFlag is the main object of the library
// it contains the cache, the config, the updater and the exporter.
type GoFeatureFlag struct {
- cache cache.Manager
- config Config
- bgUpdater backgroundUpdater
- dataExporter *exporter.Scheduler
- retrieverManager *retriever.Manager
+ cache cache.Manager
+ config Config
+ bgUpdater backgroundUpdater
+ dataExporterSchedulers []*exporter.Scheduler
+ retrieverManager *retriever.Manager
}
// ff is the default object for go-feature-flag
var ff *GoFeatureFlag
var onceFF sync.Once
-// New creates a new go-feature-flag instances that retrieve the config from a YAML file
-// and return everything you need to manage your flags.
-func New(config Config) (*GoFeatureFlag, error) {
+// validateAndSetDefaults validates the config and sets default values
+func validateAndSetDefaults(config *Config) error {
switch {
case config.PollingInterval == 0:
// The default value for the poll interval is 60 seconds
config.PollingInterval = 60 * time.Second
case config.PollingInterval < 0:
// Check that value is not negative
- return nil, fmt.Errorf("%d is not a valid PollingInterval value, it need to be > 0", config.PollingInterval)
+ return fmt.Errorf("%d is not a valid PollingInterval value, it need to be > 0", config.PollingInterval)
case config.PollingInterval < time.Second:
// the minimum value for the polling policy is 1 second
config.PollingInterval = time.Second
- default:
- // do nothing
}
if config.offlineMutex == nil {
@@ -80,6 +77,75 @@ func New(config Config) (*GoFeatureFlag, error) {
LegacyLogger: config.Logger,
}
+ return nil
+}
+
+// initializeRetrievers sets up and initializes the retriever manager
+func initializeRetrievers(config Config) (*retriever.Manager, error) {
+ retrievers, err := config.GetRetrievers()
+ if err != nil {
+ return nil, err
+ }
+
+ manager := retriever.NewManager(config.Context, retrievers, config.internalLogger)
+ err = manager.Init(config.Context)
+ if err != nil && !config.StartWithRetrieverError {
+ return nil, fmt.Errorf("impossible to initialize the retrievers, please check your configuration: %v", err)
+ }
+
+ return manager, nil
+}
+
+// initializeExporters sets up the data exporters and starts their daemons if needed
+func initializeExporters(config Config) []*exporter.Scheduler {
+ dataExporters := config.GetDataExporters()
+ if len(dataExporters) == 0 {
+ return nil
+ }
+
+ var scheduler *exporter.Scheduler
+ if len(dataExporters) == 1 {
+ scheduler = exporter.NewScheduler(
+ config.Context,
+ dataExporters[0].FlushInterval,
+ dataExporters[0].MaxEventInMemory,
+ dataExporters[0].Exporter,
+ config.internalLogger,
+ )
+ } else {
+ exporterConfigs := make([]exporter.Config, len(dataExporters))
+ for i, de := range dataExporters {
+ exporterConfigs[i] = exporter.Config{
+ Exporter: de.Exporter,
+ FlushInterval: de.FlushInterval,
+ MaxEventInMemory: de.MaxEventInMemory,
+ }
+ }
+ scheduler = exporter.NewMultiScheduler(
+ config.Context,
+ exporterConfigs,
+ config.internalLogger,
+ )
+ }
+
+ // Start daemon if we have any bulk exporters
+ for _, de := range dataExporters {
+ if de.Exporter.IsBulk() {
+ go scheduler.StartDaemon()
+ break
+ }
+ }
+
+ return []*exporter.Scheduler{scheduler}
+}
+
+// New creates a new go-feature-flag instances that retrieve the config from a YAML file
+// and return everything you need to manage your flags.
+func New(config Config) (*GoFeatureFlag, error) {
+ if err := validateAndSetDefaults(&config); err != nil {
+ return nil, err
+ }
+
goFF := &GoFeatureFlag{
config: config,
}
@@ -92,15 +158,11 @@ func New(config Config) (*GoFeatureFlag, error) {
goFF.bgUpdater = newBackgroundUpdater(config.PollingInterval, config.EnablePollingJitter)
goFF.cache = cache.New(notificationService, config.PersistentFlagConfigurationFile, config.internalLogger)
- retrievers, err := config.GetRetrievers()
+ retrieverManager, err := initializeRetrievers(config)
if err != nil {
return nil, err
}
- goFF.retrieverManager = retriever.NewManager(config.Context, retrievers, config.internalLogger)
- err = goFF.retrieverManager.Init(config.Context)
- if err != nil && !config.StartWithRetrieverError {
- return nil, fmt.Errorf("impossible to initialize the retrievers, please check your configuration: %v", err)
- }
+ goFF.retrieverManager = retrieverManager
err = retrieveFlagsAndUpdateCache(goFF.config, goFF.cache, goFF.retrieverManager, true)
if err != nil {
@@ -122,17 +184,10 @@ func New(config Config) (*GoFeatureFlag, error) {
go goFF.startFlagUpdaterDaemon()
- if goFF.config.DataExporter.Exporter != nil {
- // init the data exporter
- goFF.dataExporter = exporter.NewScheduler(goFF.config.Context, goFF.config.DataExporter.FlushInterval,
- goFF.config.DataExporter.MaxEventInMemory, goFF.config.DataExporter.Exporter, goFF.config.internalLogger)
-
- // we start the daemon only if we have a bulk exporter
- if goFF.config.DataExporter.Exporter.IsBulk() {
- go goFF.dataExporter.StartDaemon()
- }
- }
+ schedulers := initializeExporters(config)
+ goFF.dataExporterSchedulers = schedulers
}
+
config.internalLogger.Debug("GO Feature Flag is initialized")
return goFF, nil
}
@@ -177,9 +232,12 @@ func (g *GoFeatureFlag) Close() {
g.bgUpdater.close()
}
- if g.dataExporter != nil {
- g.dataExporter.Close()
+ for _, dataExporterScheduler := range g.dataExporterSchedulers {
+ if dataExporterScheduler != nil {
+ dataExporterScheduler.Close()
+ }
}
+
if g.retrieverManager != nil {
_ = g.retrieverManager.Shutdown(g.config.Context)
}
diff --git a/feature_flag_test.go b/feature_flag_test.go
index 1b8d2b95469..3c60990a309 100644
--- a/feature_flag_test.go
+++ b/feature_flag_test.go
@@ -743,3 +743,102 @@ func Test_DisableNotifierOnInit(t *testing.T) {
})
}
}
+
+func TestMultipleDataExporters(t *testing.T) {
+ // Create a client with multiple exporters
+ config := ffclient.Config{
+ PollingInterval: 5 * time.Second,
+ Retriever: &fileretriever.Retriever{Path: "testdata/flag-config.yaml"},
+ LeveledLogger: slog.Default(),
+ // Main exporter (bulk)
+ DataExporter: ffclient.DataExporter{
+ FlushInterval: 2 * time.Second,
+ MaxEventInMemory: 3,
+ Exporter: &mock.Exporter{
+ Bulk: true,
+ },
+ },
+ // Additional exporters
+ DataExporters: []ffclient.DataExporter{
+ {
+ // Bulk exporter with different settings
+ FlushInterval: 5 * time.Second,
+ MaxEventInMemory: 5,
+ Exporter: &mock.Exporter{
+ Bulk: true,
+ },
+ },
+ {
+ // Non-bulk exporter
+ FlushInterval: 1 * time.Second, // Should be ignored
+ MaxEventInMemory: 1, // Should be ignored
+ Exporter: &mock.Exporter{
+ Bulk: false,
+ },
+ },
+ {
+ // Another bulk exporter
+ FlushInterval: 3 * time.Second,
+ MaxEventInMemory: 4,
+ Exporter: &mock.Exporter{
+ Bulk: true,
+ },
+ },
+ },
+ }
+
+ gffClient, err := ffclient.New(config)
+ assert.NoError(t, err)
+ defer gffClient.Close()
+
+ // Create test user
+ user := ffcontext.NewEvaluationContext("test-user")
+
+ // Generate events to test exporters
+ // Phase 1: Generate 3 events (should trigger main exporter's MaxEventInMemory)
+ _, _ = gffClient.BoolVariation("test-flag", user, false)
+ _, _ = gffClient.BoolVariation("test-flag", user, false)
+ _, _ = gffClient.BoolVariation("test-flag", user, false)
+
+ // Wait 1 second
+ time.Sleep(1 * time.Second)
+
+ // Phase 2: Generate 2 more events (should trigger secondary exporter's MaxEventInMemory)
+ _, _ = gffClient.StringVariation("unknown-flag", user, "default1")
+ _, _ = gffClient.StringVariation("unknown-flag", user, "default2")
+
+ // Wait 2 seconds (should trigger main exporter's FlushInterval)
+ time.Sleep(2 * time.Second)
+
+ // Phase 3: Generate 2 more events
+ _, _ = gffClient.JSONVariation("json-flag", user, map[string]interface{}{"test": "value1"})
+ _, _ = gffClient.JSONVariation("json-flag", user, map[string]interface{}{"test": "value2"})
+
+ // Wait 3 seconds (should trigger tertiary exporter's FlushInterval)
+ time.Sleep(3 * time.Second)
+
+ // Phase 4: Generate 1 final event
+ _, _ = gffClient.JSONVariation("json-flag", user, map[string]interface{}{"test": "value3"})
+
+ // Wait 5 seconds (should trigger secondary exporter's FlushInterval)
+ time.Sleep(5 * time.Second)
+
+ // Verify that all exporters received events
+ for _, de := range config.GetDataExporters() {
+ mockExporter, ok := de.Exporter.(*mock.Exporter)
+ assert.True(t, ok, "Exporter should be a mock exporter")
+
+ if !mockExporter.IsBulk() {
+ // Non-bulk exporter should have received each event immediately
+ assert.Equal(t, 8, len(mockExporter.GetExportedEvents()), "Non-bulk exporter should have received all events")
+ } else {
+ // Bulk exporters should have received events in batches
+ events := mockExporter.GetExportedEvents()
+ assert.Greater(t, len(events), 0, "Bulk exporter should have received some events")
+ // Each batch should respect the MaxEventInMemory limit
+ for _, event := range events {
+ assert.NotNil(t, event)
+ }
+ }
+ }
+}
diff --git a/testutils/mock/exporter_mock.go b/testutils/mock/exporter_mock.go
index e8ce2b63511..6d5dfccb878 100644
--- a/testutils/mock/exporter_mock.go
+++ b/testutils/mock/exporter_mock.go
@@ -26,6 +26,7 @@ func (m *Exporter) Export(ctx context.Context, _ *fflog.FFLogger, events []expor
m.ExportedEvents = append(m.ExportedEvents, events...)
if m.Err != nil {
if m.ExpectedNumberErr > m.CurrentNumberErr {
+ m.ExportedEvents = m.ExportedEvents[:len(m.ExportedEvents)-len(events)]
m.CurrentNumberErr++
return m.Err
}
diff --git a/variation.go b/variation.go
index f9d32cd9545..7d2a0ec2b61 100644
--- a/variation.go
+++ b/variation.go
@@ -254,9 +254,10 @@ func (g *GoFeatureFlag) getFlagFromCache(flagKey string) (flag.Flag, error) {
// CollectEventData is collecting events and sending them to the data exporter to be stored.
func (g *GoFeatureFlag) CollectEventData(event exporter.FeatureEvent) {
- if g != nil && g.dataExporter != nil {
- // Add event in the exporter
- g.dataExporter.AddEvent(event)
+ for _, dataExporterScheduler := range g.dataExporterSchedulers {
+ if dataExporterScheduler != nil {
+ dataExporterScheduler.AddEvent(event)
+ }
}
}
diff --git a/variation_test.go b/variation_test.go
index 1bc8cd856a6..8e6f440ecdb 100644
--- a/variation_test.go
+++ b/variation_test.go
@@ -328,11 +328,13 @@ func TestBoolVariation(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -728,11 +730,13 @@ func TestBoolVariationDetails(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -1037,11 +1041,13 @@ func TestFloat64Variation(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -1355,11 +1361,13 @@ func TestFloat64VariationDetails(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -1649,9 +1657,11 @@ func TestJSONArrayVariation(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -1954,9 +1964,11 @@ func TestJSONArrayVariationDetails(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\""}, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -2226,11 +2238,13 @@ func TestJSONVariation(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -2459,11 +2473,13 @@ func TestJSONVariationDetails(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
@@ -2736,11 +2752,13 @@ func TestStringVariation(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
got, err := StringVariation(tt.args.flagKey, tt.args.user, tt.args.defaultValue)
@@ -2968,11 +2986,13 @@ func TestStringVariationDetails(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
got, err := StringVariationDetails(tt.args.flagKey, tt.args.user, tt.args.defaultValue)
@@ -3275,11 +3295,13 @@ func TestIntVariation(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
got, err := IntVariation(tt.args.flagKey, tt.args.user, tt.args.defaultValue)
@@ -3549,11 +3571,13 @@ func TestIntVariationDetails(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
got, err := IntVariationDetails(tt.args.flagKey, tt.args.user, tt.args.defaultValue)
@@ -3917,11 +3941,13 @@ func TestRawVariation(t *testing.T) {
LeveledLogger: logger,
Offline: tt.args.offline,
},
- dataExporter: exporter.NewScheduler(context.Background(), 0, 0,
- &logsexporter.Exporter{
- LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
- "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
- }, &fflog.FFLogger{LeveledLogger: logger}),
+ dataExporterSchedulers: []*exporter.Scheduler{
+ exporter.NewScheduler(context.Background(), 0, 0,
+ &logsexporter.Exporter{
+ LogFormat: "user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", " +
+ "value=\"{{ .Value}}\", variation=\"{{ .Variation}}\"",
+ }, &fflog.FFLogger{LeveledLogger: logger}),
+ },
}
}
diff --git a/website/docs/go_module/configuration.md b/website/docs/go_module/configuration.md
index d5b42c34cd1..a020632cb48 100644
--- a/website/docs/go_module/configuration.md
+++ b/website/docs/go_module/configuration.md
@@ -18,6 +18,7 @@ During the initialization you must give a [`ffclient.Config{}`](https://pkg.go.d
| `Context` | *(optional)*
The context used by the retriever.
Default: **`context.Background()`** |
| `Environment` | *(optional)*
The environment the app is running under, can be checked in feature flag rules.
Default: `""`
*Check [**"environments"** section](../configure_flag/flag_format/#environments) to understand how to use this parameter.* |
| `DataExporter` | *(optional)*
DataExporter defines the method for exporting data on the usage of your flags.
*see [export data section](data_collection/index.md) for more details*. |
+| `DataExporters` | *(optional)*
DataExporters is exactly the same thing as `DataExporter` but you can configure more than 1 exporter for your variation events.
All exporters are flushed in parallel without interdependencies.
*see [export data section](data_collection/index.md) for more details*. |
| `FileFormat` | *(optional)*
Format of your configuration file. Available formats are `yaml`, `toml` and `json`, if you omit the field it will try to unmarshal the file as a `yaml` file.
Default: **`YAML`** |
| `LeveledLogger` | *(optional)*
LeveledLogger is used to log what `go-feature-flag` is doing.
It should be a `slog` instance.
If no logger is provided the module will not log anything.
Default: **No log** |
| `Notifiers` | *(optional)*
List of notifiers to call when your flag file has been changed.
*See [notifiers section](./notifier/index.md) for more details*. |
diff --git a/website/docs/relay_proxy/configure_relay_proxy.md b/website/docs/relay_proxy/configure_relay_proxy.md
index 3709886e01f..1adc38f4e21 100644
--- a/website/docs/relay_proxy/configure_relay_proxy.md
+++ b/website/docs/relay_proxy/configure_relay_proxy.md
@@ -47,6 +47,7 @@ ex: `AUTHORIZEDKEYS_EVALUATION=my-first-key,my-second-key`)_.
| `fileFormat` | string | `yaml` | This is the format of your `go-feature-flag` configuration file. Acceptable values are `yaml`, `json`, `toml`. |
| `startWithRetrieverError` | boolean | `false` | By default the **relay proxy** will crash if it is not able to retrieve the flags from the configuration.
If you don't want your relay proxy to crash, you can set `startWithRetrieverError` to true. Until the flag is retrievable the relay proxy will only answer with default values. |
| `exporter` | [exporter](#exporter) | **none** | Exporter is the configuration used to export data. |
+| `exporters` | [[]exporter](#exporter) | **none** | Exporters is the exact same thing as `exporter` but you can configure more than 1 exporter.
| `notifier` | [notifier](#notifier) | **none** | Notifiers is the configuration on where to notify a flag change. |
| `authorizedKeys` | [authorizedKeys](#type-authorizedkeys) | **none** | List of authorized API keys. |
| `evaluationContextEnrichment` | object | **none** | It is a free field that will be merged with the evaluation context sent during the evaluation. It is useful to add common attributes to all the evaluations, such as a server version, environment, etc.
These fields will be included in the custom attributes of the evaluation context.
If in the evaluation context you have a field with the same name, it will be override by the `evaluationContextEnrichment`. |