Skip to content

Commit

Permalink
keep retrying infinitely
Browse files Browse the repository at this point in the history
  • Loading branch information
bacherfl committed Sep 12, 2023
1 parent ebdeb44 commit b2b0818
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 65 deletions.
75 changes: 38 additions & 37 deletions providers/flagd-in-process/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ const (
stateStale
)

const defaultBackoffDuration = 5 * time.Second
const defaultInitBackoffDuration = 2 * time.Second
const defaultMaxBackoffDuration = 120 * time.Second

type connectionInfo struct {
state connectionState
retries int
maxSyncRetries int
backoffDuration time.Duration
state connectionState
retries int
maxSyncRetries int
backoffDuration time.Duration
maxBackoffDuration time.Duration
}

type connectionState int
Expand Down Expand Up @@ -85,10 +87,11 @@ func NewProvider(ctx context.Context, opts ...ProviderOption) *Provider {
providerConfiguration: &ProviderConfiguration{},
isReady: make(chan struct{}),
connectionInfo: connectionInfo{
state: stateNotReady,
retries: 0,
maxSyncRetries: defaultMaxSyncRetries,
backoffDuration: defaultBackoffDuration,
state: stateNotReady,
retries: 0,
maxSyncRetries: defaultMaxSyncRetries,
backoffDuration: defaultInitBackoffDuration,
maxBackoffDuration: defaultMaxBackoffDuration,
},
ofEventChannel: make(chan of.Event),
logger: logger.NewLogger(nil, false),
Expand Down Expand Up @@ -153,14 +156,12 @@ func NewProvider(ctx context.Context, opts ...ProviderOption) *Provider {
}

func (p *Provider) startSyncSource(dataSync chan ofsync.DataSync) {
if err := p.syncSource.Sync(p.ctx, dataSync); err != nil {
p.handleConnectionErr(
fmt.Errorf("error during source sync: %w", err),
func() {
p.startSyncSource(dataSync)
},
)
for {
if err := p.syncSource.Sync(p.ctx, dataSync); err != nil {
p.handleConnectionErr(fmt.Errorf("error during source sync: %w", err))
}
}

}

func (p *Provider) watchForUpdates(dataSync chan ofsync.DataSync) error {
Expand Down Expand Up @@ -190,23 +191,22 @@ func (p *Provider) watchForUpdates(dataSync chan ofsync.DataSync) error {
}

func (p *Provider) tryReSync(dataSync chan ofsync.DataSync) {
err := p.syncSource.ReSync(p.ctx, dataSync)
if err != nil {
p.handleConnectionErr(
fmt.Errorf("error resyncing source: %w", err),
func() {
p.tryReSync(dataSync)
},
)
} else {
p.handleProviderReady()
for {
err := p.syncSource.ReSync(p.ctx, dataSync)
if err != nil {
p.handleConnectionErr(fmt.Errorf("error resyncing source: %w", err))
} else {
p.handleProviderReady()
continue
}
}

}

func (p *Provider) handleConnectionErr(err error, recoveryFunc func()) {
func (p *Provider) handleConnectionErr(err error) {
p.mu.Lock()
p.logger.Error(fmt.Sprintf("Encountered unexpected sync error: %v", err))
if p.connectionInfo.retries >= p.connectionInfo.maxSyncRetries {
if p.connectionInfo.retries >= p.connectionInfo.maxSyncRetries && p.connectionInfo.state != stateError {
p.logger.Error("Number of maximum retry attempts has been exceeded. Going into ERROR state.")
p.connectionInfo.state = stateError
p.sendProviderEvent(of.Event{
Expand All @@ -215,8 +215,6 @@ func (p *Provider) handleConnectionErr(err error, recoveryFunc func()) {
Message: err.Error(),
},
})
p.mu.Unlock()
return
}
// go to STALE state, if we have been ready previously; otherwise
// we will stay in NOT_READY
Expand All @@ -231,18 +229,21 @@ func (p *Provider) handleConnectionErr(err error, recoveryFunc func()) {
})
}
p.connectionInfo.retries++
p.mu.Unlock()
if recoveryFunc != nil {
<-time.After(p.connectionInfo.backoffDuration)
recoveryFunc()
if newBackoffDuration := p.connectionInfo.backoffDuration * 2; newBackoffDuration < p.connectionInfo.maxBackoffDuration {
p.connectionInfo.backoffDuration = newBackoffDuration
} else {
p.connectionInfo.backoffDuration = p.connectionInfo.maxBackoffDuration
}
p.mu.Unlock()
<-time.After(p.connectionInfo.backoffDuration)
}

func (p *Provider) handleProviderReady() {
p.mu.Lock()
oldState := p.connectionInfo.state
p.connectionInfo.retries = 0
p.connectionInfo.state = stateReady
p.connectionInfo.backoffDuration = defaultInitBackoffDuration
p.mu.Unlock()
// notify event channel listeners that we are now ready
if oldState != stateReady {
Expand Down Expand Up @@ -290,11 +291,11 @@ func FromEnv() ProviderOption {
fmt.Sprintf(
"Invalid env config for %s provided, using default value: %s",
flagdSyncRetryIntervalEnvironmentVariableName,
defaultBackoffDuration.String(),
defaultMaxBackoffDuration.String(),
),
)
} else {
p.connectionInfo.backoffDuration = maxSyncRetryInterval
p.connectionInfo.maxBackoffDuration = maxSyncRetryInterval
}
}

Expand Down Expand Up @@ -346,7 +347,7 @@ func WithSyncStreamConnectionMaxAttempts(i int) ProviderOption {
// WithSyncStreamConnectionBackoff sets the backoff duration between reattempts of connecting to the sync source.
func WithSyncStreamConnectionBackoff(duration time.Duration) ProviderOption {
return func(p *Provider) {
p.connectionInfo.backoffDuration = duration
p.connectionInfo.maxBackoffDuration = duration
}
}

Expand Down
56 changes: 28 additions & 28 deletions providers/flagd-in-process/pkg/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestProvider(t *testing.T) {
require.Equal(t, string(SourceTypeGrpc), prov.providerConfiguration.SourceConfig.Provider)
require.Equal(t, "my-selector", prov.providerConfiguration.SourceConfig.Selector)
require.Equal(t, 10, prov.connectionInfo.maxSyncRetries)
require.Equal(t, 1*time.Second, prov.connectionInfo.backoffDuration)
require.Equal(t, 1*time.Second, prov.connectionInfo.maxBackoffDuration)

// listen for the events emitted by the provider
receivedEvents := []of.EventType{}
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestProviderOptions(t *testing.T) {
require.Equal(t, string(SourceTypeGrpc), prov.providerConfiguration.SourceConfig.Provider)
require.Equal(t, "my-selector", prov.providerConfiguration.SourceConfig.Selector)
require.Equal(t, 42, prov.connectionInfo.maxSyncRetries)
require.Equal(t, 3*time.Second, prov.connectionInfo.backoffDuration)
require.Equal(t, 3*time.Second, prov.connectionInfo.maxBackoffDuration)
require.Equal(t, myCtx, prov.ctx)
require.NotNil(t, prov.logger)
}
Expand Down Expand Up @@ -802,20 +802,15 @@ func TestObjectEvaluation(t *testing.T) {
}
}

func failingFunc(p *Provider) {
p.handleConnectionErr(errors.New(""), func() {
failingFunc(p)
})
}

func TestProvider_handleConnectionErrEndUpInErrorState(t *testing.T) {

p := &Provider{
connectionInfo: connectionInfo{
state: stateReady,
retries: 0,
maxSyncRetries: 2,
backoffDuration: 1 * time.Millisecond,
state: stateReady,
retries: 0,
maxSyncRetries: 1,
backoffDuration: 1 * time.Millisecond,
maxBackoffDuration: 1 * time.Millisecond,
},
ofEventChannel: make(chan of.Event),
logger: logger.NewLogger(nil, false),
Expand All @@ -839,15 +834,19 @@ func TestProvider_handleConnectionErrEndUpInErrorState(t *testing.T) {
}
}(ctx)

// call handleConnectionError with a function that keeps calling handleConnectionError again
// this is to verify that eventually we terminate after maxSyncRetries has been reached
p.handleConnectionErr(errors.New("oops"), func() {
failingFunc(p)
})
// call handleConnectionError to simulate a failure
p.handleConnectionErr(errors.New("oops"))

// verify that we first go into stale state
require.Equal(t, stateStale, p.connectionInfo.state)
require.Equal(t, 1, p.connectionInfo.retries)

// call handleConnectionError again to go beyond max retries
p.handleConnectionErr(errors.New("oops"))

// verify that we end up in the error state
require.Equal(t, stateError, p.connectionInfo.state)
require.Equal(t, p.connectionInfo.maxSyncRetries, p.connectionInfo.retries)
require.Equal(t, 1, p.connectionInfo.retries)

require.Eventually(t, func() bool {
mtx.RLock()
Expand All @@ -862,10 +861,11 @@ func TestProvider_handleConnectionErrEndUpInErrorState(t *testing.T) {
func TestProvider_handleConnectionErrRecoverFromStaleState(t *testing.T) {
p := &Provider{
connectionInfo: connectionInfo{
state: stateReady,
retries: 0,
maxSyncRetries: 2,
backoffDuration: 1 * time.Millisecond,
state: stateReady,
retries: 0,
maxSyncRetries: 2,
backoffDuration: 10 * time.Millisecond,
maxBackoffDuration: 10 * time.Millisecond,
},
ofEventChannel: make(chan of.Event),
isReady: make(chan struct{}),
Expand Down Expand Up @@ -894,13 +894,13 @@ func TestProvider_handleConnectionErrRecoverFromStaleState(t *testing.T) {

// call handleConnectionError with a function that keeps calling handleConnectionError again
// this is to verify that eventually we terminate after maxSyncRetries has been reached
p.handleConnectionErr(err, func() {
require.Equal(t, stateStale, p.connectionInfo.state)
require.Equal(t, 1, p.connectionInfo.retries)
p.handleConnectionErr(err)

require.Equal(t, stateStale, p.connectionInfo.state)
require.Equal(t, 1, p.connectionInfo.retries)

// simulate successful recovery
p.handleProviderReady()
})
// simulate successful recovery
p.handleProviderReady()

// verify that we are in ready state again
require.Equal(t, stateReady, p.connectionInfo.state)
Expand Down

0 comments on commit b2b0818

Please sign in to comment.