diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index 5023fb30b..a9d44d3db 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -396,7 +396,7 @@ func run(ctx context.Context) (err error) { scheduler := source.NewScheduler(ctx, logger, conf, sourcePluginDispatcher, schedulerChan) err = scheduler.Start(ctx) if err != nil { - return reportFatalError("while starting source plugin event dispatcher: %w", err) + return reportFatalError("while starting source plugin event dispatcher", err) } if conf.Plugins.IncomingWebhook.Enabled { diff --git a/internal/source/dispatcher.go b/internal/source/dispatcher.go index 790c5e535..bc64f0312 100644 --- a/internal/source/dispatcher.go +++ b/internal/source/dispatcher.go @@ -88,8 +88,6 @@ func NewDispatcher(log logrus.FieldLogger, clusterName string, notifiers map[str } // Dispatch starts a given plugin, watches for incoming events and calling all notifiers to dispatch received event. -// Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here: -// https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253 func (d *Dispatcher) Dispatch(dispatch PluginDispatch) error { log := d.log.WithFields(logrus.Fields{ "pluginName": dispatch.pluginName, diff --git a/internal/source/kubernetes/bg_processor.go b/internal/source/kubernetes/bg_processor.go new file mode 100644 index 000000000..4d5b4a06e --- /dev/null +++ b/internal/source/kubernetes/bg_processor.go @@ -0,0 +1,70 @@ +package kubernetes + +import ( + "context" + "sync" + "time" + + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" +) + +// backgroundProcessor is responsible for running background processes. +type backgroundProcessor struct { + mu sync.RWMutex + cancelCtxFn func() + startTime time.Time + + errGroup *errgroup.Group +} + +// newBackgroundProcessor creates new background processor. +func newBackgroundProcessor() *backgroundProcessor { + return &backgroundProcessor{} +} + +// StartTime returns the start time of the background processor. +func (b *backgroundProcessor) StartTime() time.Time { + b.mu.RLock() + defer b.mu.RUnlock() + return b.startTime +} + +// Run starts the background processes. +func (b *backgroundProcessor) Run(parentCtx context.Context, fns []func(ctx context.Context)) { + b.mu.Lock() + defer b.mu.Unlock() + + b.startTime = time.Now() + ctx, cancelFn := context.WithCancel(parentCtx) + b.cancelCtxFn = cancelFn + + errGroup, errGroupCtx := errgroup.WithContext(ctx) + b.errGroup = errGroup + + for _, fn := range fns { + fn := fn + errGroup.Go(func() error { + fn(errGroupCtx) + return nil + }) + } +} + +// StopAndWait stops the background processes and waits for them to finish. +func (b *backgroundProcessor) StopAndWait(log logrus.FieldLogger) error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.cancelCtxFn != nil { + log.Debug("Cancelling context of the background processor...") + b.cancelCtxFn() + } + + if b.errGroup == nil { + return nil + } + + log.Debug("Waiting for background processor to finish...") + return b.errGroup.Wait() +} diff --git a/internal/source/kubernetes/configuration_store.go b/internal/source/kubernetes/configuration_store.go new file mode 100644 index 000000000..938eb7527 --- /dev/null +++ b/internal/source/kubernetes/configuration_store.go @@ -0,0 +1,93 @@ +package kubernetes + +import ( + "fmt" + "sync" + + "github.com/kubeshop/botkube/pkg/maputil" +) + +// configurationStore stores all source configurations in a thread-safe way. +type configurationStore struct { + store map[string]SourceConfig + storeByKubeconfig map[string]map[string]struct{} + + lock sync.RWMutex +} + +// newConfigurations creates new empty configurationStore instance. +func newConfigurations() *configurationStore { + return &configurationStore{ + store: make(map[string]SourceConfig), + storeByKubeconfig: make(map[string]map[string]struct{}), + } +} + +// Store stores SourceConfig in a thread-safe way. +func (c *configurationStore) Store(sourceName string, cfg SourceConfig) { + c.lock.Lock() + defer c.lock.Unlock() + + key := c.keyForStore(sourceName, cfg.isInteractivitySupported) + + c.store[key] = cfg + + kubeConfigKey := string(cfg.kubeConfig) + if _, ok := c.storeByKubeconfig[kubeConfigKey]; !ok { + c.storeByKubeconfig[kubeConfigKey] = make(map[string]struct{}) + } + c.storeByKubeconfig[kubeConfigKey][key] = struct{}{} +} + +// Get returns SourceConfig by a key. +func (c *configurationStore) Get(sourceKey string) (SourceConfig, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + val, ok := c.store[sourceKey] + return val, ok +} + +// GetSystemConfig returns system Source Config. +// The system config is used for getting system (plugin-wide) logger and informer resync period. +func (c *configurationStore) GetSystemConfig() (SourceConfig, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + sortedKeys := maputil.SortKeys(c.store) + if len(sortedKeys) == 0 { + return SourceConfig{}, false + } + + return c.store[sortedKeys[0]], true +} + +// Len returns number of stored SourceConfigs. +func (c *configurationStore) Len() int { + c.lock.RLock() + defer c.lock.RUnlock() + return len(c.store) +} + +// CloneByKubeconfig returns a copy of the underlying map of source configurations grouped by kubeconfigs. +func (c *configurationStore) CloneByKubeconfig() map[string]map[string]SourceConfig { + c.lock.RLock() + defer c.lock.RUnlock() + + var out = make(map[string]map[string]SourceConfig) + for kubeConfig, srcIndex := range c.storeByKubeconfig { + if out[kubeConfig] == nil { + out[kubeConfig] = make(map[string]SourceConfig) + } + + for srcKey := range srcIndex { + out[kubeConfig][srcKey] = c.store[srcKey] + } + } + + return out +} + +// keyForStore returns a key for storing configuration in the store. +func (c *configurationStore) keyForStore(sourceName string, isInteractivitySupported bool) string { + return fmt.Sprintf("%s/%t", sourceName, isInteractivitySupported) +} diff --git a/internal/source/kubernetes/filterengine/filterengine.go b/internal/source/kubernetes/filterengine/filterengine.go index 31c99dc78..352e6b87a 100644 --- a/internal/source/kubernetes/filterengine/filterengine.go +++ b/internal/source/kubernetes/filterengine/filterengine.go @@ -50,7 +50,6 @@ func New(log logrus.FieldLogger) *DefaultFilterEngine { func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event.Event { f.log.Debug("Running registered filters") filters := f.RegisteredFilters() - f.log.Debugf("registered filters: %+v", filters) for _, filter := range filters { if !filter.Enabled { @@ -59,7 +58,7 @@ func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event. err := filter.Run(ctx, &event) if err != nil { - f.log.Errorf("while running filter %q: %w", filter.Name(), err) + f.log.Errorf("while running filter %q: %s", filter.Name(), err.Error()) } f.log.Debugf("ran filter name: %q, event was skipped: %t", filter.Name(), event.Skip) } diff --git a/internal/source/kubernetes/registration.go b/internal/source/kubernetes/registration.go index e72107ca3..a57f89cc0 100644 --- a/internal/source/kubernetes/registration.go +++ b/internal/source/kubernetes/registration.go @@ -31,7 +31,7 @@ type registration struct { mappedEvent config.EventType } -func (r registration) handleEvent(ctx context.Context, s Source, resource string, eventType config.EventType, routes []route, fn eventHandler) { +func (r registration) handleEvent(ctx context.Context, resource string, eventType config.EventType, routes []route, fn eventHandler) { handleFunc := func(oldObj, newObj interface{}) { logger := r.log.WithFields(logrus.Fields{ "eventHandler": eventType, @@ -45,15 +45,15 @@ func (r registration) handleEvent(ctx context.Context, s Source, resource string return } - ok, diffs, err := r.qualifyEvent(event, newObj, oldObj, routes) + sources, diffs, err := r.qualifyEvent(event, newObj, oldObj, routes) if err != nil { logger.Errorf("while getting sources for event: %s", err.Error()) // continue anyway, there could be still some sources to handle } - if !ok { + if len(sources) == 0 { return } - fn(ctx, s, event, diffs) + fn(ctx, event, sources, diffs) } var resourceEventHandlerFuncs cache.ResourceEventHandlerFuncs @@ -69,7 +69,7 @@ func (r registration) handleEvent(ctx context.Context, s Source, resource string _, _ = r.informer.AddEventHandler(resourceEventHandlerFuncs) } -func (r registration) handleMapped(ctx context.Context, s Source, eventType config.EventType, routeTable map[string][]entry, fn eventHandler) { +func (r registration) handleMapped(ctx context.Context, eventType config.EventType, routeTable map[string][]entry, fn eventHandler) { _, _ = r.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { var eventObj coreV1.Event @@ -107,15 +107,15 @@ func (r registration) handleMapped(ctx context.Context, s Source, eventType conf } routes := eventRoutes(routeTable, gvrString, eventType) - ok, err := r.matchEvent(routes, event) + sources, err := r.matchEvent(routes, event) if err != nil { r.log.Errorf("cannot calculate event for observed mapped resource event: %q in Add event handler: %s", eventType, err.Error()) // continue anyway, there could be still some sources to handle } - if !ok { + if len(sources) == 0 { return } - fn(ctx, s, event, nil) + fn(ctx, event, sources, nil) }, }) } @@ -138,18 +138,21 @@ func (r registration) includesSrcResource(resource string) bool { return false } -func (r registration) matchEvent(routes []route, event event.Event) (bool, error) { +func (r registration) matchEvent(routes []route, event event.Event) ([]string, error) { + var out []string + errs := multierror.New() for _, rt := range routes { // event reason if rt.Event != nil && rt.Event.Reason.AreConstraintsDefined() { match, err := rt.Event.Reason.IsAllowed(event.Reason) if err != nil { - return false, err + errs = multierror.Append(errs, err) + continue } if !match { r.log.Debugf("Ignoring as reason %q doesn't match constraints %+v", event.Reason, rt.Event.Reason) - return false, nil + continue } } @@ -166,7 +169,8 @@ func (r registration) matchEvent(routes []route, event event.Event) (bool, error for _, msg := range eventMsgs { match, err := rt.Event.Message.IsAllowed(msg) if err != nil { - return false, err + errs = multierror.Append(errs, err) + continue } if match { anyMsgMatches = true @@ -175,7 +179,7 @@ func (r registration) matchEvent(routes []route, event event.Event) (bool, error } if !anyMsgMatches { r.log.Debugf("Ignoring as any event message from %q doesn't match constraints %+v", strings.Join(event.Messages, ";"), rt.Event.Message) - return false, nil + continue } } @@ -183,11 +187,12 @@ func (r registration) matchEvent(routes []route, event event.Event) (bool, error if rt.ResourceName.AreConstraintsDefined() { allowed, err := rt.ResourceName.IsAllowed(event.Name) if err != nil { - return false, err + errs = multierror.Append(errs, err) + continue } if !allowed { r.log.Debugf("Ignoring as resource name %q doesn't match constraints %+v", event.Name, rt.ResourceName) - return false, nil + continue } } @@ -195,11 +200,12 @@ func (r registration) matchEvent(routes []route, event event.Event) (bool, error if rt.Namespaces != nil && rt.Namespaces.AreConstraintsDefined() { match, err := rt.Namespaces.IsAllowed(event.Namespace) if err != nil { - return false, err + errs = multierror.Append(errs, err) + continue } if !match { r.log.Debugf("Ignoring as namespace %q doesn't match constraints %+v", event.Namespace, rt.Namespaces) - return false, nil + continue } } @@ -212,10 +218,11 @@ func (r registration) matchEvent(routes []route, event event.Event) (bool, error if !kvsSatisfiedForMap(rt.Labels, event.ObjectMeta.Labels) { continue } - return true, nil + + out = append(out, rt.Source) } - return false, errs.ErrorOrNil() + return out, errs.ErrorOrNil() } func kvsSatisfiedForMap(expectedKV *map[string]string, obj map[string]string) bool { @@ -259,26 +266,24 @@ func (r registration) qualifyEvent( event event.Event, newObj, oldObj interface{}, routes []route, -) (bool, []string, error) { - ok, err := r.matchEvent(routes, event) +) ([]string, []string, error) { + candidates, err := r.matchEvent(routes, event) if err != nil { - return false, nil, fmt.Errorf("while matching event: %w", err) - } - if !ok { - return false, nil, nil + return nil, nil, fmt.Errorf("while matching event: %w", err) } if event.Type == config.UpdateEvent { - return r.qualifyEventForUpdate(newObj, oldObj, routes) + return r.qualifyEventForUpdate(newObj, oldObj, routes, candidates) } - return true, nil, nil + return candidates, nil, nil } func (r registration) qualifyEventForUpdate( newObj, oldObj interface{}, routes []route, -) (bool, []string, error) { + candidates []string, +) ([]string, []string, error) { var diffs []string var oldUnstruct, newUnstruct *unstructured.Unstructured @@ -301,35 +306,37 @@ func (r registration) qualifyEventForUpdate( newUnstruct = &unstructured.Unstructured{} } - var result bool + var result []string - for _, route := range routes { - if !route.hasActionableUpdateSetting() { - r.log.Debugf("Qualified for update: route: %v, with no updateSettings set", route) - result = true - continue - } + for _, source := range candidates { + for _, route := range routes { + if !route.hasActionableUpdateSetting() { + r.log.Debugf("Qualified for update: route: %v, with no updateSettings set", route) + result = append(result, source) + continue + } - if route.UpdateSetting == nil { - // in theory this should never happen, as we check if it is not nil in `route.hasActionableUpdateSetting()`, but just in case - r.log.Debugf("Nil updateSetting but hasActionableUpdateSetting returned true for route: %v. This looks like a bug...", route) - continue - } + if route.UpdateSetting == nil { + // in theory this should never happen, as we check if it is not nil in `route.hasActionableUpdateSetting()`, but just in case + r.log.Debugf("Nil updateSetting but hasActionableUpdateSetting returned true for route: %v. This looks like a bug...", route) + continue + } - r.log.WithFields(logrus.Fields{"old": oldUnstruct.Object, "new": newUnstruct.Object}).Debug("Getting diff for objects...") - diff, err := k8sutil.Diff(oldUnstruct.Object, newUnstruct.Object, *route.UpdateSetting) - if err != nil { - r.log.Errorf("while getting diff: %w", err) - } - r.log.Debugf("About to qualify event for route: %v for update, diff: %s, updateSetting: %+v", route, diff, route.UpdateSetting) + r.log.WithFields(logrus.Fields{"old": oldUnstruct.Object, "new": newUnstruct.Object}).Debug("Getting diff for objects...") + diff, err := k8sutil.Diff(oldUnstruct.Object, newUnstruct.Object, *route.UpdateSetting) + if err != nil { + r.log.Errorf("while getting diff: %w", err) + } + r.log.Debugf("About to qualify event for route: %v for update, diff: %s, updateSetting: %+v", route, diff, route.UpdateSetting) - if route.UpdateSetting.IncludeDiff { - diffs = append(diffs, diff) - } + if route.UpdateSetting.IncludeDiff { + diffs = append(diffs, diff) + } - if len(diff) > 0 { - result = true - r.log.Debugf("Qualified for update: route: %v for update, diff: %s, updateSetting: %+v", route, diff, route.UpdateSetting) + if len(diff) > 0 { + result = append(result, source) + r.log.Debugf("Qualified for update: route: %v for update, diff: %s, updateSetting: %+v", route, diff, route.UpdateSetting) + } } } diff --git a/internal/source/kubernetes/router.go b/internal/source/kubernetes/router.go index 22fedd5d5..af74af1e1 100644 --- a/internal/source/kubernetes/router.go +++ b/internal/source/kubernetes/router.go @@ -11,15 +11,18 @@ import ( "github.com/kubeshop/botkube/internal/source/kubernetes/config" "github.com/kubeshop/botkube/internal/source/kubernetes/event" "github.com/kubeshop/botkube/internal/source/kubernetes/recommendation" + "github.com/kubeshop/botkube/pkg/formatx" ) const eventsResource = "v1/events" type mergedEvents map[string]map[config.EventType]struct{} type registrationHandler func(resource string) (cache.SharedIndexInformer, error) -type eventHandler func(ctx context.Context, source Source, event event.Event, updateDiffs []string) +type eventHandler func(ctx context.Context, event event.Event, sources []string, updateDiffs []string) type route struct { + Source string + ResourceName config.RegexConstraints Labels *map[string]string Annotations *map[string]string @@ -60,16 +63,15 @@ func NewRouter(mapper meta.RESTMapper, dynamicCli dynamic.Interface, log logrus. // BuildTable builds the routers routing table marking it ready // to register, map and handle informer events. -func (r *Router) BuildTable(cfg *config.Config) *Router { - mergedEvents := mergeResourceEvents(cfg) - +func (r *Router) BuildTable(cfgs map[string]SourceConfig) *Router { + mergedEvents := mergeResourceEvents(cfgs) for resource, resourceEvents := range mergedEvents { - eventRoutes := r.mergeEventRoutes(resource, cfg) + eventRoutes := r.mergeEventRoutes(resource, cfgs) for evt := range resourceEvents { r.table[resource] = append(r.table[resource], entry{Event: evt, Routes: eventRoutes[evt]}) } } - r.log.Debugf("routing table: %+v", r.table) + r.log.Debug("routing table:", formatx.StructDumper().Sdump(r.table)) return r } @@ -123,21 +125,21 @@ func (r *Router) MapWithEventsInformer(srcEvent config.EventType, dstEvent confi // RegisterEventHandler allows router clients to create handlers that are // triggered for a target event. -func (r *Router) RegisterEventHandler(ctx context.Context, s Source, eventType config.EventType, handlerFn func(ctx context.Context, s Source, e event.Event, updateDiffs []string)) { +func (r *Router) RegisterEventHandler(ctx context.Context, eventType config.EventType, handlerFn eventHandler) { for resource, reg := range r.registrations { if !reg.canHandleEvent(eventType.String()) { continue } sourceRoutes := r.getSourceRoutes(resource, eventType) - reg.handleEvent(ctx, s, resource, eventType, sourceRoutes, handlerFn) + reg.handleEvent(ctx, resource, eventType, sourceRoutes, handlerFn) } } // HandleMappedEvent allows router clients to create handlers that are // triggered for a target mapped event. -func (r *Router) HandleMappedEvent(ctx context.Context, s Source, targetEvent config.EventType, handlerFn eventHandler) { +func (r *Router) HandleMappedEvent(ctx context.Context, targetEvent config.EventType, handlerFn eventHandler) { if informer, ok := r.mappedInformer(targetEvent); ok { - informer.handleMapped(ctx, s, targetEvent, r.table, handlerFn) + informer.handleMapped(ctx, targetEvent, r.table, handlerFn) } } @@ -146,61 +148,67 @@ func (r *Router) getSourceRoutes(resource string, targetEvent config.EventType) return eventRoutes(r.table, resource, targetEvent) } -func mergeResourceEvents(cfg *config.Config) mergedEvents { +func mergeResourceEvents(cfgs map[string]SourceConfig) mergedEvents { out := map[string]map[config.EventType]struct{}{} - for _, resource := range cfg.Resources { - if _, ok := out[resource.Type]; !ok { - out[resource.Type] = make(map[config.EventType]struct{}) - } - for _, e := range flattenEventTypes(cfg.Event.Types, resource.Event.Types) { - out[resource.Type][e] = struct{}{} + for _, srcGroupCfg := range cfgs { + cfg := srcGroupCfg.cfg + for _, resource := range cfg.Resources { + if _, ok := out[resource.Type]; !ok { + out[resource.Type] = make(map[config.EventType]struct{}) + } + for _, e := range flattenEventTypes(cfg.Event.Types, resource.Event.Types) { + out[resource.Type][e] = struct{}{} + } } - } - resForRecomms := recommendation.ResourceEventsForConfig(cfg.Recommendations) - for resourceType, eventType := range resForRecomms { - if _, ok := out[resourceType]; !ok { - out[resourceType] = make(map[config.EventType]struct{}) + resForRecomms := recommendation.ResourceEventsForConfig(cfg.Recommendations) + for resourceType, eventType := range resForRecomms { + if _, ok := out[resourceType]; !ok { + out[resourceType] = make(map[config.EventType]struct{}) + } + out[resourceType][eventType] = struct{}{} } - out[resourceType][eventType] = struct{}{} } return out } -func (r *Router) mergeEventRoutes(resource string, cfg *config.Config) map[config.EventType][]route { +func (r *Router) mergeEventRoutes(resource string, cfgs map[string]SourceConfig) map[config.EventType][]route { out := make(map[config.EventType][]route) - for idx := range cfg.Resources { - r := cfg.Resources[idx] // make sure that we work on a copy - for _, e := range flattenEventTypes(cfg.Event.Types, r.Event.Types) { - if resource != r.Type { - continue - } - route := route{ - Namespaces: resourceNamespaces(cfg.Namespaces, &r.Namespaces), - Annotations: resourceStringMap(cfg.Annotations, r.Annotations), - Labels: resourceStringMap(cfg.Labels, r.Labels), - ResourceName: r.Name, - Event: resourceEvent(*cfg.Event, r.Event), - } - if e == config.UpdateEvent { - route.UpdateSetting = &config.UpdateSetting{ - Fields: r.UpdateSetting.Fields, - IncludeDiff: r.UpdateSetting.IncludeDiff, + for srcGroupName, srcCfg := range cfgs { + cfg := srcCfg.cfg + for idx := range cfg.Resources { + r := cfg.Resources[idx] // make sure that we work on a copy + for _, e := range flattenEventTypes(cfg.Event.Types, r.Event.Types) { + if resource != r.Type { + continue + } + route := route{ + Source: srcGroupName, + Namespaces: resourceNamespaces(cfg.Namespaces, &r.Namespaces), + Annotations: resourceStringMap(cfg.Annotations, r.Annotations), + Labels: resourceStringMap(cfg.Labels, r.Labels), + ResourceName: r.Name, + Event: resourceEvent(*cfg.Event, r.Event), + } + if e == config.UpdateEvent { + route.UpdateSetting = &config.UpdateSetting{ + Fields: r.UpdateSetting.Fields, + IncludeDiff: r.UpdateSetting.IncludeDiff, + } } - } - out[e] = append(out[e], route) + out[e] = append(out[e], route) + } } + // add routes related to recommendations + resForRecomms := recommendation.ResourceEventsForConfig(cfg.Recommendations) + r.setEventRouteForRecommendationsIfShould(&out, resForRecomms, srcGroupName, resource, &cfg) } - // add routes related to recommendations - resForRecomms := recommendation.ResourceEventsForConfig(cfg.Recommendations) - r.setEventRouteForRecommendationsIfShould(&out, resForRecomms, resource, cfg) - return out } -func (r *Router) setEventRouteForRecommendationsIfShould(routeMap *map[config.EventType][]route, resForRecomms map[string]config.EventType, resourceType string, cfg *config.Config) { +func (r *Router) setEventRouteForRecommendationsIfShould(routeMap *map[config.EventType][]route, resForRecomms map[string]config.EventType, srcGroupName, resourceType string, cfg *config.Config) { if routeMap == nil { r.log.Debug("Skipping setting event route for recommendations as the routeMap is nil") return @@ -212,6 +220,7 @@ func (r *Router) setEventRouteForRecommendationsIfShould(routeMap *map[config.Ev } recommRoute := route{ + Source: srcGroupName, Namespaces: cfg.Namespaces, Event: &config.KubernetesEvent{ Reason: config.RegexConstraints{}, @@ -223,6 +232,10 @@ func (r *Router) setEventRouteForRecommendationsIfShould(routeMap *map[config.Ev // Override route and get all these events for all namespaces. // The events without recommendations will be filtered out when sending the event. for i, r := range (*routeMap)[eventType] { + if r.Source != srcGroupName { + continue + } + recommRoute.Namespaces = resourceNamespaces(cfg.Namespaces, r.Namespaces) (*routeMap)[eventType][i] = recommRoute return diff --git a/internal/source/kubernetes/router_test.go b/internal/source/kubernetes/router_test.go index 8c34f1ed0..7a26ca280 100644 --- a/internal/source/kubernetes/router_test.go +++ b/internal/source/kubernetes/router_test.go @@ -21,35 +21,39 @@ func TestRouter_BuildTable_CreatesRoutesWithProperEventsList(t *testing.T) { tests := []struct { name string - givenCfg config.Config + givenCfg map[string]SourceConfig }{ { name: "Events defined on top-level but override by resource once", - givenCfg: config.Config{ - - Event: &config.KubernetesEvent{ - Types: []config.EventType{ - config.CreateEvent, - config.ErrorEvent, - }, - }, - Resources: []config.Resource{ - { - Type: hasRoutes, - Namespaces: config.RegexConstraints{ - Include: []string{"default"}, - }, - Event: config.KubernetesEvent{ + givenCfg: map[string]SourceConfig{ + "k8s-events": { + name: "k8s-events", + cfg: config.Config{ + Event: &config.KubernetesEvent{ Types: []config.EventType{ config.CreateEvent, - config.DeleteEvent, - config.UpdateEvent, config.ErrorEvent, }, }, - UpdateSetting: config.UpdateSetting{ - Fields: []string{"status.availableReplicas"}, - IncludeDiff: true, + Resources: []config.Resource{ + { + Type: hasRoutes, + Namespaces: config.RegexConstraints{ + Include: []string{"default"}, + }, + Event: config.KubernetesEvent{ + Types: []config.EventType{ + config.CreateEvent, + config.DeleteEvent, + config.UpdateEvent, + config.ErrorEvent, + }, + }, + UpdateSetting: config.UpdateSetting{ + Fields: []string{"status.availableReplicas"}, + IncludeDiff: true, + }, + }, }, }, }, @@ -61,7 +65,7 @@ func TestRouter_BuildTable_CreatesRoutesWithProperEventsList(t *testing.T) { t.Run(tc.name, func(t *testing.T) { router := NewRouter(nil, nil, loggerx.NewNoop()) - router = router.BuildTable(&tc.givenCfg) + router = router.BuildTable(tc.givenCfg) assert.Len(t, router.getSourceRoutes(hasRoutes, config.CreateEvent), 1) assert.Len(t, router.getSourceRoutes(hasRoutes, config.UpdateEvent), 1) assert.Len(t, router.getSourceRoutes(hasRoutes, config.DeleteEvent), 1) @@ -81,8 +85,15 @@ func TestRouterListMergingNestedFields(t *testing.T) { err = yaml.Unmarshal(fixConfig, &cfg) require.NoError(t, err) + srcCfgs := map[string]SourceConfig{ + "test": { + name: "test", + cfg: cfg, + }, + } + // when - router = router.BuildTable(&cfg) + router = router.BuildTable(srcCfgs) // then for key := range router.table { diff --git a/internal/source/kubernetes/source.go b/internal/source/kubernetes/source.go index 03711bda7..53834527c 100644 --- a/internal/source/kubernetes/source.go +++ b/internal/source/kubernetes/source.go @@ -6,9 +6,11 @@ import ( "fmt" "os" "strings" + "sync" "time" "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" @@ -24,6 +26,7 @@ import ( "github.com/kubeshop/botkube/pkg/api/source" pkgConfig "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/loggerx" + "github.com/kubeshop/botkube/pkg/multierror" "github.com/kubeshop/botkube/pkg/plugin" ) @@ -51,32 +54,63 @@ type RecommendationFactory interface { // Source Kubernetes source plugin data structure type Source struct { - pluginVersion string - config config.Config - logger logrus.FieldLogger + bgProcessor *backgroundProcessor + pluginVersion string + configStore *configurationStore + + mu sync.Mutex + + source.HandleExternalRequestUnimplemented +} + +type SourceConfig struct { + name string eventCh chan source.Event - startTime time.Time - recommFactory RecommendationFactory - commandGuard *command.CommandGuard - filterEngine filterengine.FilterEngine + cfg config.Config + isInteractivitySupported bool clusterName string kubeConfig []byte - messageBuilder *MessageBuilder - isInteractivitySupported bool - source.HandleExternalRequestUnimplemented + *ActiveSourceConfig +} + +type ActiveSourceConfig struct { + logger logrus.FieldLogger + messageBuilder *MessageBuilder + filterEngine *filterengine.DefaultFilterEngine + recommFactory *recommendation.Factory } // NewSource returns a new instance of Source. func NewSource(version string) *Source { return &Source{ pluginVersion: version, + configStore: newConfigurations(), + bgProcessor: newBackgroundProcessor(), } } -// Stream streams Kubernetes events -func (*Source) Stream(ctx context.Context, input source.StreamInput) (source.StreamOutput, error) { - if err := plugin.ValidateKubeConfigProvided(PluginName, input.Context.KubeConfig); err != nil { +// Metadata returns metadata of Kubernetes configuration. +func (s *Source) Metadata(_ context.Context) (api.MetadataOutput, error) { + return api.MetadataOutput{ + Version: s.pluginVersion, + Description: description, + DocumentationURL: "https://docs.botkube.io/configuration/source/kubernetes", + JSONSchema: api.JSONSchema{ + Value: configJSONSchema, + }, + Recommended: true, + }, nil +} + +// Stream streams Kubernetes events. +// WARNING: This method has to be thread-safe. +func (s *Source) Stream(ctx context.Context, input source.StreamInput) (source.StreamOutput, error) { + s.mu.Lock() + defer s.mu.Unlock() + + kubeConfig := input.Context.KubeConfig + if err := plugin.ValidateKubeConfigProvided(PluginName, kubeConfig); err != nil { return source.StreamOutput{}, err } @@ -85,54 +119,80 @@ func (*Source) Stream(ctx context.Context, input source.StreamInput) (source.Str return source.StreamOutput{}, fmt.Errorf("while merging input configs: %w", err) } - // In Kubernetes, we have an "info" level by default. We should aim to minimize info logging and consider using - // the debug level instead. This approach will prevent flooding the Agent logs with irrelevant information, - // as the Agent logs everything that plugin writes to stderr. - log := loggerx.NewStderr(pkgConfig.Logger{ - Level: cfg.Log.Level, + srcName := input.Context.SourceName + eventCh := make(chan source.Event) + s.configStore.Store(srcName, SourceConfig{ + name: srcName, + eventCh: eventCh, + cfg: cfg, + isInteractivitySupported: input.Context.IsInteractivitySupported, + clusterName: input.Context.ClusterName, + kubeConfig: kubeConfig, }) - s := Source{ - startTime: time.Now(), - eventCh: make(chan source.Event), - config: cfg, - logger: log, - clusterName: input.Context.ClusterName, - kubeConfig: input.Context.KubeConfig, - isInteractivitySupported: input.Context.IsInteractivitySupported, + systemSrcCfg, ok := s.configStore.GetSystemConfig() + if !ok { + exitOnError(err, loggerx.New(pkgConfig.Logger{}).WithField("error", "global source configuration not found")) } - go consumeEvents(ctx, s) + id := s.configStore.Len() + globalLogger := loggerx.NewStderr(pkgConfig.Logger{ + Level: systemSrcCfg.cfg.Log.Level, + }).WithField("id", id) + + err = s.bgProcessor.StopAndWait(globalLogger) + loggerx.ExitOnError(err, "While stopping background processor") // this should never happen + + cfgsByKubeConfig := s.configStore.CloneByKubeconfig() + globalLogger.Infof("Reconfiguring background process with %d different Kubeconfig(s)...", len(cfgsByKubeConfig)) + + var fns []func(context.Context) + for kubeConfig, srcCfgs := range cfgsByKubeConfig { + fn := s.genFnForKubeconfig(id, []byte(kubeConfig), globalLogger, systemSrcCfg.cfg.InformerResyncPeriod, srcCfgs) + fns = append(fns, fn) + } + + s.bgProcessor.Run(ctx, fns) + return source.StreamOutput{ - Event: s.eventCh, + Event: eventCh, }, nil } -// Metadata returns metadata of Kubernetes configuration -func (s *Source) Metadata(_ context.Context) (api.MetadataOutput, error) { - return api.MetadataOutput{ - Version: s.pluginVersion, - Description: description, - DocumentationURL: "https://docs.botkube.io/configuration/source/kubernetes", - JSONSchema: api.JSONSchema{ - Value: configJSONSchema, - }, - Recommended: true, - }, nil -} +func (s *Source) configureProcessForSources(ctx context.Context, id int, kubeConfig []byte, globalLogger logrus.FieldLogger, informerResyncPeriod time.Duration, srcCfgs map[string]SourceConfig) error { + client, err := NewClient(kubeConfig) + if err != nil { + return fmt.Errorf("while creating Kubernetes client: %w", err) + } + + for _, srcCfg := range srcCfgs { + cfg := srcCfg.cfg + logger := loggerx.NewStderr(pkgConfig.Logger{ + Level: cfg.Log.Level, + }).WithField("id", id) -func consumeEvents(ctx context.Context, s Source) { - client, err := NewClient(s.kubeConfig) - exitOnError(err, s.logger) + commandGuard := command.NewCommandGuard(logger.WithField(componentLogFieldKey, "Command Guard"), client.discoveryCli) + cmdr := commander.NewCommander(logger.WithField(componentLogFieldKey, "Commander"), commandGuard, cfg.Commands) - dynamicKubeInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client.dynamicCli, s.config.InformerResyncPeriod) - router := NewRouter(client.mapper, client.dynamicCli, s.logger) - router.BuildTable(&s.config) - s.recommFactory = recommendation.NewFactory(s.logger.WithField("component", "Recommendations"), client.dynamicCli) - s.commandGuard = command.NewCommandGuard(s.logger.WithField(componentLogFieldKey, "Command Guard"), client.discoveryCli) - cmdr := commander.NewCommander(s.logger.WithField(componentLogFieldKey, "Commander"), s.commandGuard, s.config.Commands) - s.messageBuilder = NewMessageBuilder(s.isInteractivitySupported, s.logger.WithField(componentLogFieldKey, "Message Builder"), cmdr) - s.filterEngine = filterengine.WithAllFilters(s.logger, client.dynamicCli, client.mapper, s.config.Filters) + recommFactory := recommendation.NewFactory(logger.WithField("component", "Recommendations"), client.dynamicCli) + filterEngine := filterengine.WithAllFilters(logger, client.dynamicCli, client.mapper, cfg.Filters) + messageBuilder := NewMessageBuilder(srcCfg.isInteractivitySupported, logger.WithField(componentLogFieldKey, "Message Builder"), cmdr) + + srcCfg.ActiveSourceConfig = &ActiveSourceConfig{ + logger: logger, + recommFactory: recommFactory, + filterEngine: filterEngine, + messageBuilder: messageBuilder, + } + + s.configStore.Store(srcCfg.name, srcCfg) + } + + router := NewRouter(client.mapper, client.dynamicCli, globalLogger) + router.BuildTable(srcCfgs) + + globalLogger.Info("Registering informers...") + dynamicKubeInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client.dynamicCli, informerResyncPeriod) err = router.RegisterInformers([]config.EventType{ config.CreateEvent, @@ -141,13 +201,13 @@ func consumeEvents(ctx context.Context, s Source) { }, func(resource string) (cache.SharedIndexInformer, error) { gvr, err := parseResourceArg(resource, client.mapper) if err != nil { - s.logger.Errorf("Unable to parse resource: %s to register with informer\n", resource) + globalLogger.WithError(err).Errorf("Unable to parse resource: %s to register with informer\n", resource) return nil, err } return dynamicKubeInformerFactory.ForResource(gvr).Informer(), nil }) if err != nil { - exitOnError(err, s.logger.WithFields(logrus.Fields{ + exitOnError(err, globalLogger.WithFields(logrus.Fields{ "events": []config.EventType{ config.CreateEvent, config.UpdateEvent, @@ -163,17 +223,13 @@ func consumeEvents(ctx context.Context, s Source) { func(resource string) (cache.SharedIndexInformer, error) { gvr, err := parseResourceArg(resource, client.mapper) if err != nil { - s.logger.Infof("Unable to parse resource: %s to register with informer\n", resource) + globalLogger.WithError(err).Errorf("Unable to parse resource: %s to register with informer\n", resource) return nil, err } return dynamicKubeInformerFactory.ForResource(gvr).Informer(), nil }) if err != nil { - exitOnError(err, s.logger.WithFields(logrus.Fields{ - "srcEvent": config.ErrorEvent, - "dstEvent": config.WarningEvent, - "error": err.Error(), - })) + return fmt.Errorf("while mapping with events informer: %w", err) } eventTypes := []config.EventType{ @@ -184,78 +240,112 @@ func consumeEvents(ctx context.Context, s Source) { for _, eventType := range eventTypes { router.RegisterEventHandler( ctx, - s, eventType, - handleEvent, + s.handleEventFn(globalLogger), ) } router.HandleMappedEvent( ctx, - s, config.ErrorEvent, - handleEvent, + s.handleEventFn(globalLogger), ) + globalLogger.Info("Starting background process...") stopCh := ctx.Done() dynamicKubeInformerFactory.Start(stopCh) + <-stopCh + dynamicKubeInformerFactory.Shutdown() + globalLogger.Info("Stopped background process...") + return nil } -func handleEvent(ctx context.Context, s Source, e event.Event, updateDiffs []string) { - s.logger.Debugf("Processing %s to %s/%v in %s namespace", e.Type, e.Resource, e.Name, e.Namespace) - enrichEventWithAdditionalMetadata(s, &e) +func (s *Source) handleEventFn(log logrus.FieldLogger) func(ctx context.Context, e event.Event, sources, updateDiffs []string) { + globalLogger := log - // Skip older events - if !e.TimeStamp.IsZero() && e.TimeStamp.Before(s.startTime) { - s.logger.Debug("Skipping older events") - return - } + return func(ctx context.Context, e event.Event, sources, updateDiffs []string) { + globalLogger.Debugf("Processing %s to %s/%v in %s namespace", e.Type, e.Resource, e.Name, e.Namespace) - // Check for significant Update Events in objects - if e.Type == config.UpdateEvent && len(updateDiffs) > 0 { - e.Messages = append(e.Messages, updateDiffs...) - } + // Skip older events + if !e.TimeStamp.IsZero() && e.TimeStamp.Before(s.bgProcessor.StartTime()) { + globalLogger.Debug("Skipping older event...") + return + } - // Filter events - e = s.filterEngine.Run(ctx, e) - if e.Skip { - s.logger.Debugf("Skipping event: %#v", e) - return - } + if e.Kind == "" { + globalLogger.Warn("Skipping event without Kind...") + return + } - if len(e.Kind) <= 0 { - s.logger.Warn("sendEvent received e with Kind nil. Hence skipping.") - return - } + // Check for significant Update Events in objects + if e.Type == config.UpdateEvent && len(updateDiffs) > 0 { + e.Messages = append(e.Messages, updateDiffs...) + } - recRunner, recCfg := s.recommFactory.New(s.config) - err := recRunner.Do(ctx, &e) - if err != nil { - s.logger.Errorf("while running recommendations: %w", err) - return - } + errs := multierror.New() + for _, sourceKey := range sources { + eventCopy := e - if recommendation.ShouldIgnoreEvent(&recCfg, e) { - s.logger.Debugf("Skipping event as it is related to recommendation informers and doesn't have any recommendations: %#v", e) - return - } + srcCfg, ok := s.configStore.Get(sourceKey) + if !ok { + errs = multierror.Append(errs, fmt.Errorf("source with key %q not found", sourceKey)) + continue + } - msg, err := s.messageBuilder.FromEvent(e, s.config.ExtraButtons) - if err != nil { - s.logger.Errorf("while rendering message from event: %w", err) - return - } + if srcCfg.ActiveSourceConfig == nil { + errs = multierror.Append(errs, fmt.Errorf("ActiveSourceConfig not found for source %s. This seems to be a bug", srcCfg.name)) + continue + } + + eventCopy.Cluster = srcCfg.clusterName - message := source.Event{ - Message: msg, - RawObject: e, - AnalyticsLabels: event.AnonymizedEventDetailsFrom(e), + // Filter events + e = srcCfg.filterEngine.Run(ctx, eventCopy) + if e.Skip { + srcCfg.logger.WithField("event", e).Debugf("Skipping event as skip flag is set to true") + continue + } + + recRunner, recCfg := srcCfg.recommFactory.New(srcCfg.cfg) + err := recRunner.Do(ctx, &eventCopy) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while running recommendation: %w", err)) + continue + } + + if recommendation.ShouldIgnoreEvent(&recCfg, eventCopy) { + srcCfg.logger.Debugf("Skipping event as it is related to recommendation informers and doesn't have any recommendations: %#v", e) + continue + } + + msg, err := srcCfg.messageBuilder.FromEvent(eventCopy, srcCfg.cfg.ExtraButtons) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while building message from event: %w", err)) + continue + } + + message := source.Event{ + Message: msg, + RawObject: eventCopy, + AnalyticsLabels: event.AnonymizedEventDetailsFrom(eventCopy), + } + + srcCfg.eventCh <- message + } + + if errs.ErrorOrNil() != nil { + globalLogger.WithError(errs).Errorf("failed to send event for '%s/%s' resource", e.Namespace, e.Name) + } } - s.eventCh <- message } -func enrichEventWithAdditionalMetadata(s Source, event *event.Event) { - event.Cluster = s.clusterName +func (s *Source) genFnForKubeconfig(id int, kubeConfig []byte, globalLogger logrus.FieldLogger, informerResyncPeriod time.Duration, srcCfgs map[string]SourceConfig) func(ctx context.Context) { + return func(ctx context.Context) { + err := s.configureProcessForSources(ctx, id, kubeConfig, globalLogger, informerResyncPeriod, srcCfgs) + if err != nil { + exitOnError(fmt.Errorf("while configuring process for sources"), globalLogger.WithError(err).WithField("srcCfgs", maps.Keys(srcCfgs))) + } + } } func parseResourceArg(arg string, mapper meta.RESTMapper) (schema.GroupVersionResource, error) { diff --git a/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-apps.v1.deployments.golden.yaml b/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-apps.v1.deployments.golden.yaml index 03fa6e6a2..ec96d0ee3 100644 --- a/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-apps.v1.deployments.golden.yaml +++ b/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-apps.v1.deployments.golden.yaml @@ -1,6 +1,7 @@ - event: delete routes: - - resourcename: + - source: test + resourcename: include: [] labels: test: label-1-level diff --git a/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.configmaps.golden.yaml b/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.configmaps.golden.yaml index e7abd838b..b244e16fc 100644 --- a/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.configmaps.golden.yaml +++ b/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.configmaps.golden.yaml @@ -1,6 +1,7 @@ - event: create routes: - - resourcename: + - source: test + resourcename: include: [] labels: test: label-2-level diff --git a/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.pod.golden.yaml b/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.pod.golden.yaml index f1bd85f1b..3543ed6c3 100644 --- a/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.pod.golden.yaml +++ b/internal/source/kubernetes/testdata/TestRouterListMergingNestedFields/route-v1.pod.golden.yaml @@ -1,6 +1,7 @@ - event: delete routes: - - resourcename: + - source: test + resourcename: include: [] labels: test: label-1-level diff --git a/internal/source/scheduler.go b/internal/source/scheduler.go index 8f0188c9a..8923bb1ac 100644 --- a/internal/source/scheduler.go +++ b/internal/source/scheduler.go @@ -10,6 +10,7 @@ import ( "github.com/kubeshop/botkube/pkg/api/source" "github.com/kubeshop/botkube/pkg/config" + "github.com/kubeshop/botkube/pkg/maputil" ) const ( @@ -140,8 +141,11 @@ func (d *Scheduler) monitorHealth(ctx context.Context) { } func (d *Scheduler) schedule(pluginFilter string) error { - for configKey, sourceConfig := range d.dispatchConfig { - for pluginName, config := range sourceConfig { + sortedKeys := maputil.SortKeys(d.dispatchConfig) // ensure config iteration order is alphabetical + for _, configKey := range sortedKeys { + sourceConfig := d.dispatchConfig[configKey] + + for pluginName, srcCfg := range sourceConfig { if pluginFilter != emptyPluginFilter && pluginFilter != pluginName { d.log.Debugf("Not starting %q as it doesn't pass plugin filter.", pluginName) continue @@ -153,7 +157,7 @@ func (d *Scheduler) schedule(pluginFilter string) error { } d.log.Infof("Starting a new stream for plugin %q", pluginName) - if err := d.dispatcher.Dispatch(config); err != nil { + if err := d.dispatcher.Dispatch(srcCfg); err != nil { return fmt.Errorf("while starting plugin source %s: %w", pluginName, err) }