-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: add streaming/polling v2 builders (#190)
Adds `StreamingDataSourceV2` and `PollingDataSourceV2` configuration builders. They are wrappers around the new `datasourcev2` internal stream/polling processors. These are not stable, not subject to backwards compat/semantic versioning, and likely to change. The docs are WIP.
- Loading branch information
1 parent
de100b0
commit 48deeab
Showing
4 changed files
with
405 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package ldcomponents | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
|
||
"github.com/launchdarkly/go-sdk-common/v3/ldvalue" | ||
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource" | ||
"github.com/launchdarkly/go-server-sdk/v7/internal/datasourcev2" | ||
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" | ||
"github.com/launchdarkly/go-server-sdk/v7/subsystems" | ||
) | ||
|
||
// PollingDataSourceBuilderV2 provides methods for configuring the polling data source. | ||
// | ||
// This builder is not stable, and not subject to any backwards | ||
// compatibility guarantees or semantic versioning. It is not suitable for production usage. | ||
// | ||
// Do not use it. | ||
// You have been warned. | ||
type PollingDataSourceBuilderV2 struct { | ||
pollInterval time.Duration | ||
filterKey ldvalue.OptionalString | ||
} | ||
|
||
// PollingDataSourceV2 returns a configurable factory for using polling mode to get feature flag data. | ||
// | ||
// This builder is not stable, and not subject to any backwards | ||
// compatibility guarantees or semantic versioning. It is not suitable for production usage. | ||
// | ||
// Do not use it. | ||
// You have been warned. | ||
// | ||
// Polling is not the default behavior; by default, the SDK uses a streaming connection to receive feature flag | ||
// data from LaunchDarkly. In polling mode, the SDK instead makes a new HTTP request to LaunchDarkly at regular | ||
// intervals. HTTP caching allows it to avoid redundantly downloading data if there have been no changes, but | ||
// polling is still less efficient than streaming and should only be used on the advice of LaunchDarkly support. | ||
func PollingDataSourceV2() *PollingDataSourceBuilderV2 { | ||
return &PollingDataSourceBuilderV2{ | ||
pollInterval: DefaultPollInterval, | ||
} | ||
} | ||
|
||
// PollInterval sets the interval at which the SDK will poll for feature flag updates. | ||
// | ||
// The default and minimum value is [DefaultPollInterval]. Values less than this will be set to the default. | ||
func (b *PollingDataSourceBuilderV2) PollInterval(pollInterval time.Duration) *PollingDataSourceBuilderV2 { | ||
if pollInterval < DefaultPollInterval { | ||
b.pollInterval = DefaultPollInterval | ||
} else { | ||
b.pollInterval = pollInterval | ||
} | ||
return b | ||
} | ||
|
||
// Used in tests to skip parameter validation. | ||
// | ||
//nolint:unused // it is used in tests | ||
func (b *PollingDataSourceBuilderV2) forcePollInterval( | ||
pollInterval time.Duration, | ||
) *PollingDataSourceBuilderV2 { | ||
b.pollInterval = pollInterval | ||
return b | ||
} | ||
|
||
// PayloadFilter sets the filter key for the polling connection. | ||
// | ||
// By default, the SDK is able to evaluate all flags in an environment. If this is undesirable - | ||
// for example, the environment contains thousands of flags, but this application only needs to evaluate | ||
// a smaller, known subset - then a filter may be setup in LaunchDarkly, and the filter's key specified here. | ||
// | ||
// Evaluations for flags that aren't part of the filtered environment will return default values. | ||
func (b *PollingDataSourceBuilderV2) PayloadFilter(filterKey string) *PollingDataSourceBuilderV2 { | ||
b.filterKey = ldvalue.NewOptionalString(filterKey) | ||
return b | ||
} | ||
|
||
// Build is called internally by the SDK. | ||
func (b *PollingDataSourceBuilderV2) Build(context subsystems.ClientContext) (subsystems.DataSource, error) { | ||
context.GetLogging().Loggers.Warn( | ||
"You should only disable the streaming API if instructed to do so by LaunchDarkly support") | ||
filterKey, wasSet := b.filterKey.Get() | ||
if wasSet && filterKey == "" { | ||
return nil, errors.New("payload filter key cannot be an empty string") | ||
} | ||
configuredBaseURI := endpoints.SelectBaseURI( | ||
context.GetServiceEndpoints(), | ||
endpoints.PollingService, | ||
context.GetLogging().Loggers, | ||
) | ||
cfg := datasource.PollingConfig{ | ||
BaseURI: configuredBaseURI, | ||
PollInterval: b.pollInterval, | ||
FilterKey: filterKey, | ||
} | ||
return datasourcev2.NewPollingProcessor(context, context.GetDataSourceUpdateSink(), cfg), nil | ||
} | ||
|
||
// DescribeConfiguration is used internally by the SDK to inspect the configuration. | ||
func (b *PollingDataSourceBuilderV2) DescribeConfiguration(context subsystems.ClientContext) ldvalue.Value { | ||
return ldvalue.ObjectBuild(). | ||
SetBool("streamingDisabled", true). | ||
SetBool("customBaseURI", | ||
endpoints.IsCustom(context.GetServiceEndpoints(), endpoints.PollingService)). | ||
Set("pollingIntervalMillis", durationToMillisValue(b.pollInterval)). | ||
SetBool("usingRelayDaemon", false). | ||
Build() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package ldcomponents | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/launchdarkly/go-server-sdk/v7/internal/datasourcev2" | ||
|
||
"github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest/mocks" | ||
|
||
"github.com/launchdarkly/go-server-sdk/v7/internal/datastore" | ||
"github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestPollingDataSourceV2Builder(t *testing.T) { | ||
t.Run("PollInterval", func(t *testing.T) { | ||
p := PollingDataSourceV2() | ||
assert.Equal(t, DefaultPollInterval, p.pollInterval) | ||
|
||
p.PollInterval(time.Hour) | ||
assert.Equal(t, time.Hour, p.pollInterval) | ||
|
||
p.PollInterval(time.Second) | ||
assert.Equal(t, DefaultPollInterval, p.pollInterval) | ||
|
||
p.forcePollInterval(time.Second) | ||
assert.Equal(t, time.Second, p.pollInterval) | ||
}) | ||
|
||
t.Run("PayloadFilter", func(t *testing.T) { | ||
t.Run("build succeeds with no payload filter", func(t *testing.T) { | ||
s := PollingDataSourceV2() | ||
clientContext := makeTestContextWithBaseURIs("base") | ||
_, err := s.Build(clientContext) | ||
assert.NoError(t, err) | ||
}) | ||
|
||
t.Run("build succeeds with non-empty payload filter", func(t *testing.T) { | ||
s := PollingDataSourceV2() | ||
clientContext := makeTestContextWithBaseURIs("base") | ||
s.PayloadFilter("microservice-1") | ||
_, err := s.Build(clientContext) | ||
assert.NoError(t, err) | ||
}) | ||
|
||
t.Run("build fails with empty payload filter", func(t *testing.T) { | ||
s := PollingDataSourceV2() | ||
clientContext := makeTestContextWithBaseURIs("base") | ||
s.PayloadFilter("") | ||
_, err := s.Build(clientContext) | ||
assert.Error(t, err) | ||
}) | ||
}) | ||
t.Run("CreateDefaultDataSource", func(t *testing.T) { | ||
baseURI := "base" | ||
|
||
p := PollingDataSourceV2() | ||
|
||
dsu := mocks.NewMockDataSourceUpdates(datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())) | ||
clientContext := makeTestContextWithBaseURIs(baseURI) | ||
clientContext.BasicClientContext.DataSourceUpdateSink = dsu | ||
ds, err := p.Build(clientContext) | ||
require.NoError(t, err) | ||
require.NotNil(t, ds) | ||
defer ds.Close() | ||
|
||
pp := ds.(*datasourcev2.PollingProcessor) | ||
assert.Equal(t, baseURI, pp.GetBaseURI()) | ||
assert.Equal(t, DefaultPollInterval, pp.GetPollInterval()) | ||
}) | ||
|
||
t.Run("CreateCustomizedDataSource", func(t *testing.T) { | ||
baseURI := "base" | ||
interval := time.Hour | ||
filter := "microservice-1" | ||
|
||
p := PollingDataSourceV2().PollInterval(interval).PayloadFilter(filter) | ||
|
||
dsu := mocks.NewMockDataSourceUpdates(datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())) | ||
clientContext := makeTestContextWithBaseURIs(baseURI) | ||
clientContext.BasicClientContext.DataSourceUpdateSink = dsu | ||
ds, err := p.Build(clientContext) | ||
require.NoError(t, err) | ||
require.NotNil(t, ds) | ||
defer ds.Close() | ||
|
||
pp := ds.(*datasourcev2.PollingProcessor) | ||
assert.Equal(t, baseURI, pp.GetBaseURI()) | ||
assert.Equal(t, interval, pp.GetPollInterval()) | ||
assert.Equal(t, filter, pp.GetFilterKey()) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package ldcomponents | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
|
||
"github.com/launchdarkly/go-sdk-common/v3/ldvalue" | ||
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource" | ||
"github.com/launchdarkly/go-server-sdk/v7/internal/datasourcev2" | ||
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" | ||
"github.com/launchdarkly/go-server-sdk/v7/subsystems" | ||
) | ||
|
||
// StreamingDataSourceBuilderV2 provides methods for configuring the streaming data source in v2 mode. | ||
// | ||
// This builder is not stable, and not subject to any backwards | ||
// compatibility guarantees or semantic versioning. It is not suitable for production usage. | ||
// | ||
// Do not use it. | ||
// You have been warned. | ||
type StreamingDataSourceBuilderV2 struct { | ||
initialReconnectDelay time.Duration | ||
filterKey ldvalue.OptionalString | ||
} | ||
|
||
// StreamingDataSourceV2 returns a configurable factory for using streaming mode to get feature flag data. | ||
// | ||
// This builder is not stable, and not subject to any backwards | ||
// compatibility guarantees or semantic versioning. It is not suitable for production usage. | ||
// | ||
// Do not use it. | ||
// You have been warned. | ||
// | ||
// By default, the SDK uses a streaming connection to receive feature flag data from LaunchDarkly. To use the | ||
// default behavior, you do not need to call this method. | ||
func StreamingDataSourceV2() *StreamingDataSourceBuilderV2 { | ||
return &StreamingDataSourceBuilderV2{ | ||
initialReconnectDelay: DefaultInitialReconnectDelay, | ||
} | ||
} | ||
|
||
// InitialReconnectDelay sets the initial reconnect delay for the streaming connection. | ||
// | ||
// The streaming service uses a backoff algorithm (with jitter) every time the connection needs to be | ||
// reestablished. The delay for the first reconnection will start near this value, and then increase | ||
// exponentially for any subsequent connection failures. | ||
// | ||
// The default value is [DefaultInitialReconnectDelay]. | ||
func (b *StreamingDataSourceBuilderV2) InitialReconnectDelay( | ||
initialReconnectDelay time.Duration, | ||
) *StreamingDataSourceBuilderV2 { | ||
if initialReconnectDelay <= 0 { | ||
b.initialReconnectDelay = DefaultInitialReconnectDelay | ||
} else { | ||
b.initialReconnectDelay = initialReconnectDelay | ||
} | ||
return b | ||
} | ||
|
||
// PayloadFilter sets the payload filter key for this streaming connection. The filter key | ||
// cannot be an empty string. | ||
// | ||
// By default, the SDK is able to evaluate all flags in an environment. If this is undesirable - | ||
// for example, the environment contains thousands of flags, but this application only needs to evaluate | ||
// a smaller, known subset - then a payload filter may be setup in LaunchDarkly, and the filter's key specified here. | ||
// | ||
// Evaluations for flags that aren't part of the filtered environment will return default values. | ||
func (b *StreamingDataSourceBuilderV2) PayloadFilter(filterKey string) *StreamingDataSourceBuilderV2 { | ||
b.filterKey = ldvalue.NewOptionalString(filterKey) | ||
return b | ||
} | ||
|
||
// Build is called internally by the SDK. | ||
func (b *StreamingDataSourceBuilderV2) Build(context subsystems.ClientContext) (subsystems.DataSource, error) { | ||
filterKey, wasSet := b.filterKey.Get() | ||
if wasSet && filterKey == "" { | ||
return nil, errors.New("payload filter key cannot be an empty string") | ||
} | ||
configuredBaseURI := endpoints.SelectBaseURI( | ||
context.GetServiceEndpoints(), | ||
endpoints.StreamingService, | ||
context.GetLogging().Loggers, | ||
) | ||
cfg := datasource.StreamConfig{ | ||
URI: configuredBaseURI, | ||
InitialReconnectDelay: b.initialReconnectDelay, | ||
FilterKey: filterKey, | ||
} | ||
return datasourcev2.NewStreamProcessor( | ||
context, | ||
context.GetDataSourceUpdateSink(), | ||
cfg, | ||
), nil | ||
} | ||
|
||
// DescribeConfiguration is used internally by the SDK to inspect the configuration. | ||
func (b *StreamingDataSourceBuilderV2) DescribeConfiguration(context subsystems.ClientContext) ldvalue.Value { | ||
return ldvalue.ObjectBuild(). | ||
SetBool("streamingDisabled", false). | ||
SetBool("customStreamURI", | ||
endpoints.IsCustom(context.GetServiceEndpoints(), endpoints.StreamingService)). | ||
Set("reconnectTimeMillis", durationToMillisValue(b.initialReconnectDelay)). | ||
SetBool("usingRelayDaemon", false). | ||
Build() | ||
} |
Oops, something went wrong.