Skip to content

Commit

Permalink
refactor: streaming/polling processors take DataDestination/StatusRep…
Browse files Browse the repository at this point in the history
…orter interfaces (#191)

This change splits up the monolithic `DataSourceUpdateSink` into two:
1. `DataDestination`: where data goes
2. `StatusReporter`: where statuses go

This makes it easier to mock them for testing, and also represents a
cleaner separation of concerns. These new interfaces are slightly
different than the `DataSourceUpdateSink` for FDV1 vs FDV2.

In the new system, we want to be able to do things like passing a
payload version in `Init`, whereas the old FDV1 code has no such concept.
  • Loading branch information
cwaldren-ld authored Sep 18, 2024
1 parent 48deeab commit a2fd391
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 82 deletions.
32 changes: 18 additions & 14 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type Requester interface {
// configuration. All other code outside of this package should interact with it only via the
// DataSource interface.
type PollingProcessor struct {
dataSourceUpdates subsystems.DataSourceUpdateSink
dataDestination subsystems.DataDestination
statusReporter subsystems.DataSourceStatusReporter
requester Requester
pollInterval time.Duration
loggers ldlog.Loggers
Expand All @@ -45,25 +46,28 @@ type PollingProcessor struct {
// NewPollingProcessor creates the internal implementation of the polling data source.
func NewPollingProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
cfg datasource.PollingConfig,
) *PollingProcessor {
httpRequester := newPollingRequester(context, context.GetHTTP().CreateHTTPClient(), cfg.BaseURI, cfg.FilterKey)
return newPollingProcessor(context, dataSourceUpdates, httpRequester, cfg.PollInterval)
return newPollingProcessor(context, dataDestination, statusReporter, httpRequester, cfg.PollInterval)
}

func newPollingProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
requester Requester,
pollInterval time.Duration,
) *PollingProcessor {
pp := &PollingProcessor{
dataSourceUpdates: dataSourceUpdates,
requester: requester,
pollInterval: pollInterval,
loggers: context.GetLogging().Loggers,
quit: make(chan struct{}),
dataDestination: dataDestination,
statusReporter: statusReporter,
requester: requester,
pollInterval: pollInterval,
loggers: context.GetLogging().Loggers,
quit: make(chan struct{}),
}
return pp
}
Expand Down Expand Up @@ -106,9 +110,9 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
pollingWillRetryMessage,
)
if recoverable {
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
} else {
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
notifyReady()
return
}
Expand All @@ -122,11 +126,11 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData
}
checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage)
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
}
continue
}
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
pp.setInitializedOnce.Do(func() {
pp.isInitialized.Set(true)
pp.loggers.Info("First polling request successful")
Expand All @@ -146,7 +150,7 @@ func (pp *PollingProcessor) poll() error {

// We initialize the store only if the request wasn't cached
if !cached {
pp.dataSourceUpdates.Init(allData)
pp.dataDestination.Init(allData, nil)
}
return nil
}
Expand Down
73 changes: 23 additions & 50 deletions internal/datasourcev2/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ const (
// DataSource interface.
type StreamProcessor struct {
cfg datasource.StreamConfig
dataSourceUpdates subsystems.DataSourceUpdateSink
dataDestination subsystems.DataDestination
statusReporter subsystems.DataSourceStatusReporter
client *http.Client
headers http.Header
diagnosticsManager *ldevents.DiagnosticsManager
loggers ldlog.Loggers
isInitialized internal.AtomicBoolean
halt chan struct{}
storeStatusCh <-chan interfaces.DataStoreStatus
connectionAttemptStartTime ldtime.UnixMillisecondTime
connectionAttemptLock sync.Mutex
readyOnce sync.Once
Expand All @@ -91,15 +91,17 @@ type StreamProcessor struct {
// NewStreamProcessor creates the internal implementation of the streaming data source.
func NewStreamProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
cfg datasource.StreamConfig,
) *StreamProcessor {
sp := &StreamProcessor{
dataSourceUpdates: dataSourceUpdates,
headers: context.GetHTTP().DefaultHeaders,
loggers: context.GetLogging().Loggers,
halt: make(chan struct{}),
cfg: cfg,
dataDestination: dataDestination,
statusReporter: statusReporter,
headers: context.GetHTTP().DefaultHeaders,
loggers: context.GetLogging().Loggers,
halt: make(chan struct{}),
cfg: cfg,
}
if cci, ok := context.(*internal.ClientContextImpl); ok {
sp.diagnosticsManager = cci.DiagnosticsManager
Expand All @@ -123,9 +125,6 @@ func (sp *StreamProcessor) IsInitialized() bool {
//nolint:revive // no doc comment for standard method
func (sp *StreamProcessor) Start(closeWhenReady chan<- struct{}) {
sp.loggers.Info("Starting LaunchDarkly streaming connection")
if sp.dataSourceUpdates.GetDataStoreStatusProvider().IsStatusMonitoringEnabled() {
sp.storeStatusCh = sp.dataSourceUpdates.GetDataStoreStatusProvider().AddStatusListener()
}
go sp.subscribe(closeWhenReady)
}

Expand Down Expand Up @@ -183,21 +182,16 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
Message: err.Error(),
Time: time.Now(),
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)

shouldRestart = true // scenario 1 in error handling comments at top of file
processedEvent = false
}

storeUpdateFailed := func(updateDesc string) {
if sp.storeStatusCh != nil {
sp.loggers.Errorf("Failed to store %s in data store; will try again once data store is working", updateDesc)
// scenario 2a in error handling comments at top of file
} else {
sp.loggers.Errorf("Failed to store %s in data store; will restart stream until successful", updateDesc)
shouldRestart = true // scenario 2b
processedEvent = false
}
sp.loggers.Errorf("Failed to store %s in data store; will restart stream until successful", updateDesc)
shouldRestart = true // scenario 2b
processedEvent = false
}

switch event.Event() {
Expand Down Expand Up @@ -266,18 +260,18 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
for _, update := range updates {
switch u := update.(type) {
case datasource.PatchData:
if !sp.dataSourceUpdates.Upsert(u.Kind, u.Key, u.Data) {
if !sp.dataDestination.Upsert(u.Kind, u.Key, u.Data) {
storeUpdateFailed("streaming update of " + u.Key)
}
case datasource.PutData:
if sp.dataSourceUpdates.Init(u.Data) {
if sp.dataDestination.Init(u.Data, nil) {
sp.setInitializedAndNotifyClient(true, closeWhenReady)
} else {
storeUpdateFailed("initial streaming data")
}
case datasource.DeleteData:
deletedItem := ldstoretypes.ItemDescriptor{Version: u.Version, Item: nil}
if !sp.dataSourceUpdates.Upsert(u.Kind, u.Key, deletedItem) {
if !sp.dataDestination.Upsert(u.Kind, u.Key, deletedItem) {
storeUpdateFailed("streaming deletion of " + u.Key)
}

Expand All @@ -291,30 +285,12 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
}

if processedEvent {
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
}
if shouldRestart {
stream.Restart()
}

case newStoreStatus := <-sp.storeStatusCh:
if sp.loggers.IsDebugEnabled() {
sp.loggers.Debugf("StreamProcessorV2 received store status update: %+v", newStoreStatus)
}
if newStoreStatus.Available {
// The store has just transitioned from unavailable to available (scenario 2a above)
if newStoreStatus.NeedsRefresh {
// The store is telling us that it can't guarantee that all of the latest data was cached.
// So we'll restart the stream to ensure a full refresh.
sp.loggers.Warn("Restarting stream to refresh data after data store outage")
stream.Restart()
}
// All of the updates were cached and have been written to the store, so we don't need to
// restart the stream. We just need to make sure the client knows we're initialized now
// (in case the initial "put" was not stored).
sp.setInitializedAndNotifyClient(true, closeWhenReady)
}

case <-sp.halt:
stream.Close()
return
Expand All @@ -329,7 +305,7 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
"Unable to create a stream request; this is not a network problem, most likely a bad base URI: %s",
reqErr,
)
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindUnknown,
Message: reqErr.Error(),
Time: time.Now(),
Expand Down Expand Up @@ -373,10 +349,10 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
)
if recoverable {
sp.logConnectionStarted()
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
return es.StreamErrorHandlerResult{CloseNow: false}
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
return es.StreamErrorHandlerResult{CloseNow: true}
}

Expand All @@ -392,7 +368,7 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
Message: err.Error(),
Time: time.Now(),
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.logConnectionStarted()
return es.StreamErrorHandlerResult{CloseNow: false}
}
Expand Down Expand Up @@ -453,10 +429,7 @@ func (sp *StreamProcessor) logConnectionResult(success bool) {
func (sp *StreamProcessor) Close() error {
sp.closeOnce.Do(func() {
close(sp.halt)
if sp.storeStatusCh != nil {
sp.dataSourceUpdates.GetDataStoreStatusProvider().RemoveStatusListener(sp.storeStatusCh)
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
})
return nil
}
Expand Down
104 changes: 104 additions & 0 deletions internal/sharedtest/mocks/mock_data_destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package mocks

import (
"sync"
"testing"
"time"

"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

th "github.com/launchdarkly/go-test-helpers/v3"

"github.com/stretchr/testify/assert"
)

// MockDataDestination is a mock implementation of a data destination used by tests involving FDv2 data sources.
type MockDataDestination struct {
DataStore *CapturingDataStore
Statuses chan interfaces.DataSourceStatus
dataStoreStatusProvider *mockDataStoreStatusProvider
lastStatus interfaces.DataSourceStatus
lock sync.Mutex
}

// NewMockDataDestination creates an instance of MockDataDestination.
//
// The DataStoreStatusProvider can be nil if we are not doing a test that requires manipulation of that
// component.
func NewMockDataDestination(realStore subsystems.DataStore) *MockDataDestination {
dataStore := NewCapturingDataStore(realStore)
dataStoreStatusProvider := &mockDataStoreStatusProvider{
dataStore: dataStore,
status: interfaces.DataStoreStatus{Available: true},
statusCh: make(chan interfaces.DataStoreStatus, 10),
}
return &MockDataDestination{
DataStore: dataStore,
Statuses: make(chan interfaces.DataSourceStatus, 10),
dataStoreStatusProvider: dataStoreStatusProvider,
}
}

// Init in this test implementation, delegates to d.DataStore.CapturedUpdates.
func (d *MockDataDestination) Init(allData []ldstoretypes.Collection, _ *int) bool {
// For now, the payloadVersion is ignored. When the data sources start making use of it, it should be
// stored so that assertions can be made.
for _, coll := range allData {
AssertNotNil(coll.Kind)
}
err := d.DataStore.Init(allData)
return err == nil
}

// Upsert in this test implementation, delegates to d.DataStore.CapturedUpdates.
func (d *MockDataDestination) Upsert(
kind ldstoretypes.DataKind,
key string,
newItem ldstoretypes.ItemDescriptor,
) bool {
AssertNotNil(kind)
_, err := d.DataStore.Upsert(kind, key, newItem)
return err == nil
}

// UpdateStatus in this test implementation, pushes a value onto the Statuses channel.
func (d *MockDataDestination) UpdateStatus(
newState interfaces.DataSourceState,
newError interfaces.DataSourceErrorInfo,
) {
d.lock.Lock()
defer d.lock.Unlock()
if newState != d.lastStatus.State || newError.Kind != "" {
d.lastStatus = interfaces.DataSourceStatus{State: newState, LastError: newError}
d.Statuses <- d.lastStatus
}
}

// GetDataStoreStatusProvider returns a stub implementation that does not have full functionality
// but enough to test a data source with.
func (d *MockDataDestination) GetDataStoreStatusProvider() interfaces.DataStoreStatusProvider {
return d.dataStoreStatusProvider
}

// UpdateStoreStatus simulates a change in the data store status.
func (d *MockDataDestination) UpdateStoreStatus(newStatus interfaces.DataStoreStatus) {
d.dataStoreStatusProvider.statusCh <- newStatus
}

// RequireStatusOf blocks until a new data source status is available, and verifies its state.
func (d *MockDataDestination) RequireStatusOf(
t *testing.T,
newState interfaces.DataSourceState,
) interfaces.DataSourceStatus {
status := d.RequireStatus(t)
assert.Equal(t, string(newState), string(status.State))
// string conversion is due to a bug in assert with type aliases
return status
}

// RequireStatus blocks until a new data source status is available.
func (d *MockDataDestination) RequireStatus(t *testing.T) interfaces.DataSourceStatus {
return th.RequireValue(t, d.Statuses, time.Second, "timed out waiting for new data source status")
}
Loading

0 comments on commit a2fd391

Please sign in to comment.