Skip to content

Commit

Permalink
feat: blocking provider mutator (#251)
Browse files Browse the repository at this point in the history
* introduce wait apis and improve existing tests

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* tests for new apis

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* improve race conditions

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* return error for synchronous provider registrations

Signed-off-by: Kavindu Dodanduwa <[email protected]>

---------

Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan authored Feb 7, 2024
1 parent eaefcc8 commit 6f71fe4
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 60 deletions.
112 changes: 68 additions & 44 deletions openfeature/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package openfeature

import (
"errors"
"fmt"
"sync"

"github.com/go-logr/logr"
Expand All @@ -14,7 +15,7 @@ type evaluationAPI struct {
defaultProvider FeatureProvider
namedProviders map[string]FeatureProvider
hks []Hook
evalCtx EvaluationContext
apiCtx EvaluationContext
logger logr.Logger
mu sync.RWMutex
eventExecutor *eventExecutor
Expand All @@ -28,29 +29,32 @@ func newEvaluationAPI() evaluationAPI {
defaultProvider: NoopProvider{},
namedProviders: map[string]FeatureProvider{},
hks: []Hook{},
evalCtx: EvaluationContext{},
apiCtx: EvaluationContext{},
logger: logger,
mu: sync.RWMutex{},
eventExecutor: newEventExecutor(logger),
}
}

// setProvider sets the default FeatureProvider of the evaluationAPI. Returns an error if FeatureProvider is nil
func (api *evaluationAPI) setProvider(provider FeatureProvider) error {
// setProvider sets the default FeatureProvider of the evaluationAPI.
// Returns an error if provider registration cause an error
func (api *evaluationAPI) setProvider(provider FeatureProvider, async bool) error {
api.mu.Lock()
defer api.mu.Unlock()

if provider == nil {
return errors.New("default provider cannot be set to nil")
}

// Initialize new default provider and shutdown the old one
// Provider update must be non-blocking, hence initialization & shutdown happens concurrently
oldProvider := api.defaultProvider
api.defaultProvider = provider

api.initNewAndShutdownOld(provider, oldProvider)
err := api.eventExecutor.registerDefaultProvider(provider)
err := api.initNewAndShutdownOld(provider, oldProvider, async)
if err != nil {
return err
}

err = api.eventExecutor.registerDefaultProvider(provider)
if err != nil {
return err
}
Expand All @@ -67,7 +71,7 @@ func (api *evaluationAPI) getProvider() FeatureProvider {
}

// setProvider sets a provider with client name. Returns an error if FeatureProvider is nil
func (api *evaluationAPI) setNamedProvider(clientName string, provider FeatureProvider) error {
func (api *evaluationAPI) setNamedProvider(clientName string, provider FeatureProvider, async bool) error {
api.mu.Lock()
defer api.mu.Unlock()

Expand All @@ -80,8 +84,12 @@ func (api *evaluationAPI) setNamedProvider(clientName string, provider FeaturePr
oldProvider := api.namedProviders[clientName]
api.namedProviders[clientName] = provider

api.initNewAndShutdownOld(provider, oldProvider)
err := api.eventExecutor.registerNamedEventingProvider(clientName, provider)
err := api.initNewAndShutdownOld(provider, oldProvider, async)
if err != nil {
return err
}

err = api.eventExecutor.registerNamedEventingProvider(clientName, provider)
if err != nil {
return err
}
Expand All @@ -97,11 +105,11 @@ func (api *evaluationAPI) getNamedProviders() map[string]FeatureProvider {
return api.namedProviders
}

func (api *evaluationAPI) setEvaluationContext(evalCtx EvaluationContext) {
func (api *evaluationAPI) setEvaluationContext(apiCtx EvaluationContext) {
api.mu.Lock()
defer api.mu.Unlock()

api.evalCtx = evalCtx
api.apiCtx = apiCtx
}

func (api *evaluationAPI) setLogger(l logr.Logger) {
Expand Down Expand Up @@ -163,52 +171,68 @@ func (api *evaluationAPI) forTransaction(clientName string) (FeatureProvider, []
provider = api.defaultProvider
}

return provider, api.hks, api.evalCtx
return provider, api.hks, api.apiCtx
}

// initNewAndShutdownOld is a helper to initialise new FeatureProvider and shutdown the old FeatureProvider.
// Operations happen concurrently.
func (api *evaluationAPI) initNewAndShutdownOld(newProvider FeatureProvider, oldProvider FeatureProvider) {
v, ok := newProvider.(StateHandler)
if ok && v.Status() == NotReadyState {
go func(provider FeatureProvider, stateHandler StateHandler, evalCtx EvaluationContext, eventChan chan eventPayload) {
err := stateHandler.Init(evalCtx)
// emit ready/error event once initialization is complete
if err != nil {
eventChan <- eventPayload{
Event{
ProviderName: provider.Metadata().Name,
EventType: ProviderError,
ProviderEventDetails: ProviderEventDetails{},
}, provider,
}
} else {
eventChan <- eventPayload{
Event{
ProviderName: provider.Metadata().Name,
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{},
}, provider,
}
}
}(newProvider, v, api.evalCtx, api.eventExecutor.eventChan)
}

v, ok = oldProvider.(StateHandler)
func (api *evaluationAPI) initNewAndShutdownOld(newProvider FeatureProvider, oldProvider FeatureProvider, async bool) error {
if async {
go func(executor *eventExecutor, ctx EvaluationContext) {
// for async initialization, error is conveyed as an event
event, _ := initializer(newProvider, ctx)
executor.triggerEvent(event, newProvider)
}(api.eventExecutor, api.apiCtx)
} else {
event, err := initializer(newProvider, api.apiCtx)
api.eventExecutor.triggerEvent(event, newProvider)
if err != nil {
return err
}
}

v, ok := oldProvider.(StateHandler)

// oldProvider can be nil or without state handling capability
if oldProvider == nil || !ok {
return
return nil
}

// check for multiple bindings
if oldProvider == api.defaultProvider || contains(oldProvider, maps.Values(api.namedProviders)) {
return
return nil
}

go func(forShutdown StateHandler) {
forShutdown.Shutdown()
}(v)

return nil
}

// initializer is a helper to execute provider initialization and generate appropriate event for the initialization
// It also returns an error if the initialization resulted in an error
func initializer(provider FeatureProvider, apiCtx EvaluationContext) (Event, error) {
var event = Event{
ProviderName: provider.Metadata().Name,
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{
Message: "Provider initialization successful",
},
}

handler, ok := provider.(StateHandler)
if !ok {
// Note - a provider without state handling capability can be assumed to be ready immediately.
return event, nil
}

err := handler.Init(apiCtx)
if err != nil {
event.EventType = ProviderError
event.Message = fmt.Sprintf("Provider initialization error, %v", err)
}

return event, err
}

func contains(provider FeatureProvider, in []FeatureProvider) bool {
Expand Down
2 changes: 1 addition & 1 deletion openfeature/event_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (e *eventExecutor) triggerEvent(event Event, handler FeatureProvider) {
}
}

if e.defaultProviderReference.featureProvider != handler {
if !reflect.DeepEqual(e.defaultProviderReference.featureProvider, handler) {
return
}

Expand Down
29 changes: 16 additions & 13 deletions openfeature/event_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func TestEventHandler_Eventing(t *testing.T) {
rsp <- details
}

AddHandler(ProviderReady, &callBack)
eventType := ProviderConfigChange
AddHandler(eventType, &callBack)

fCh := []string{"flagA"}
meta := map[string]interface{}{
Expand All @@ -91,7 +92,7 @@ func TestEventHandler_Eventing(t *testing.T) {

// trigger event from provider implementation
eventingImpl.Invoke(Event{
EventType: ProviderReady,
EventType: eventType,
ProviderEventDetails: ProviderEventDetails{
Message: "ReadyMessage",
FlagChanges: fCh,
Expand Down Expand Up @@ -139,7 +140,7 @@ func TestEventHandler_Eventing(t *testing.T) {
// associated to client name
associatedName := "providerForClient"

err := SetNamedProvider(associatedName, eventingProvider)
err := SetNamedProviderAndWait(associatedName, eventingProvider)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -214,13 +215,13 @@ func TestEventHandler_clientAssociation(t *testing.T) {
}

// default provider
err := SetProvider(defaultProvider)
err := SetProviderAndWait(defaultProvider)
if err != nil {
t.Fatal(err)
}

// named provider(associated to name someClient)
err = SetNamedProvider("someClient", struct {
err = SetNamedProviderAndWait("someClient", struct {
FeatureProvider
EventHandler
}{
Expand Down Expand Up @@ -273,7 +274,7 @@ func TestEventHandler_ErrorHandling(t *testing.T) {
eventing,
}

errorCallback := func(e EventDetails) {
failingCallback := func(e EventDetails) {
panic("callback panic")
}

Expand All @@ -292,24 +293,26 @@ func TestEventHandler_ErrorHandling(t *testing.T) {
t.Fatal(err)
}

successEventType := ProviderStale

// api level handlers
AddHandler(ProviderReady, &errorCallback)
AddHandler(ProviderReady, &successAPICallback)
AddHandler(ProviderConfigChange, &failingCallback)
AddHandler(successEventType, &successAPICallback)

// provider association
providerName := "providerA"

client := NewClient(providerName)

// client level handlers
client.AddHandler(ProviderReady, &errorCallback)
client.AddHandler(ProviderReady, &successClientCallback)
client.AddHandler(ProviderConfigChange, &failingCallback)
client.AddHandler(successEventType, &successClientCallback)

// trigger events manually
go func() {
eventing.Invoke(Event{
ProviderName: providerName,
EventType: ProviderReady,
EventType: successEventType,
ProviderEventDetails: ProviderEventDetails{},
})
}()
Expand Down Expand Up @@ -967,7 +970,7 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
},
}

if err := SetProvider(provider); err != nil {
if err := SetProviderAndWait(provider); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1001,7 +1004,7 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
},
}

if err := SetProvider(provider); err != nil {
if err := SetProviderAndWait(provider); err != nil {
t.Fatal(err)
}

Expand Down
16 changes: 14 additions & 2 deletions openfeature/openfeature.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,25 @@ func initSingleton() {
// SetProvider sets the default provider. Provider initialization is asynchronous and status can be checked from
// provider status
func SetProvider(provider FeatureProvider) error {
return api.setProvider(provider)
return api.setProvider(provider, true)
}

// SetProviderAndWait sets the default provider and waits for its initialization.
// Returns an error if initialization cause error
func SetProviderAndWait(provider FeatureProvider) error {
return api.setProvider(provider, false)
}

// SetNamedProvider sets a provider mapped to the given Client name. Provider initialization is asynchronous and
// status can be checked from provider status
func SetNamedProvider(clientName string, provider FeatureProvider) error {
return api.setNamedProvider(clientName, provider)
return api.setNamedProvider(clientName, provider, true)
}

// SetNamedProviderAndWait sets a provider mapped to the given Client name and waits for its initialization.
// Returns an error if initialization cause error
func SetNamedProviderAndWait(clientName string, provider FeatureProvider) error {
return api.setNamedProvider(clientName, provider, false)
}

// SetEvaluationContext sets the global evaluation context.
Expand Down
Loading

0 comments on commit 6f71fe4

Please sign in to comment.