From ba872b6d0dbefb975f8574a841ce7dd185cefd35 Mon Sep 17 00:00:00 2001 From: "robin.hubbig" Date: Fri, 5 Apr 2024 10:16:02 +0200 Subject: [PATCH] Extend remote config watch with context and callback --- viper.go | 52 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/viper.go b/viper.go index 20eb4da17..78b3ce662 100644 --- a/viper.go +++ b/viper.go @@ -21,6 +21,7 @@ package viper import ( "bytes" + "context" "encoding/csv" "errors" "fmt" @@ -143,6 +144,16 @@ func DecodeHook(hook mapstructure.DecodeHookFunc) DecoderConfigOption { } } +type RemoteConfigEvent uint + +const ( + RemoteConfigEvent_Unknown RemoteConfigEvent = iota + // Remote config was updated + RemoteConfigEvent_Updated + // Remote config watch routine stopped + RemoteConfigEvent_Stopped +) + // Viper is a prioritized configuration registry. It // maintains a set of configuration sources, fetches // values to populate those, and provides them according @@ -217,7 +228,8 @@ type Viper struct { aliases map[string]string typeByDefValue bool - onConfigChange func(fsnotify.Event) + onConfigChange func(fsnotify.Event) + onRemoteConfigChange func(RemoteConfigEvent) logger *slog.Logger @@ -432,6 +444,14 @@ func (v *Viper) OnConfigChange(run func(in fsnotify.Event)) { v.onConfigChange = run } +// OnRemoteConfigChange sets the event handler that is called when a remote config file changes. +func OnRemoteConfigChange(run func(RemoteConfigEvent)) { v.OnRemoteConfigChange(run) } + +// OnRemoteConfigChange sets the event handler that is called when a remote config file changes. +func (v *Viper) OnRemoteConfigChange(run func(RemoteConfigEvent)) { + v.onRemoteConfigChange = run +} + // WatchConfig starts watching a config file for changes. func WatchConfig() { v.WatchConfig() } @@ -1973,7 +1993,11 @@ func (v *Viper) WatchRemoteConfig() error { } func (v *Viper) WatchRemoteConfigOnChannel() error { - return v.watchKeyValueConfigOnChannel() + return v.watchKeyValueConfigOnChannelWithContext(context.TODO()) +} + +func (v *Viper) WatchRemoteConfigOnChannelWithContext(ctx context.Context) error { + return v.watchKeyValueConfigOnChannelWithContext(ctx) } // Retrieve the first found remote configuration. @@ -2010,20 +2034,32 @@ func (v *Viper) getRemoteConfig(provider RemoteProvider) (map[string]any, error) return v.kvstore, err } -// Retrieve the first found remote configuration. -func (v *Viper) watchKeyValueConfigOnChannel() error { +// Watch the first found remote configuration. +func (v *Viper) watchKeyValueConfigOnChannelWithContext(ctx context.Context) error { if len(v.remoteProviders) == 0 { return RemoteConfigError("No Remote Providers") } for _, rp := range v.remoteProviders { respc, _ := RemoteConfig.WatchChannel(rp) - // Todo: Add quit channel go func(rc <-chan *RemoteResponse) { for { - b := <-rc - reader := bytes.NewReader(b.Value) - v.unmarshalReader(reader, v.kvstore) + select { + case b := <-rc: + reader := bytes.NewReader(b.Value) + if err := v.unmarshalReader(reader, v.kvstore); err != nil { + v.logger.Error(fmt.Errorf("unmarshal remote config update: %w", err).Error()) + continue + } + if v.onRemoteConfigChange != nil { + v.onRemoteConfigChange(RemoteConfigEvent_Updated) + } + case <-ctx.Done(): + if v.onRemoteConfigChange != nil { + v.onRemoteConfigChange(RemoteConfigEvent_Stopped) + } + return + } } }(respc) return nil