diff --git a/cmd/controller/main.go b/cmd/controller/main.go index a95618a1c1..5a13301300 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -15,6 +15,9 @@ package main import ( "flag" + "fmt" + "regexp" + "strings" "github.com/tektoncd/chains/pkg/reconciler/pipelinerun" "github.com/tektoncd/chains/pkg/reconciler/taskrun" @@ -41,7 +44,18 @@ import ( func main() { flag.IntVar(&controller.DefaultThreadsPerController, "threads-per-controller", controller.DefaultThreadsPerController, "Threads (goroutines) to create per controller") - namespace := flag.String("namespace", "", "Namespace to restrict informer to. Optional, defaults to all namespaces.") + namespaceList := flag.String("namespaces", "", "Comma-separated list of namespaces to restrict informer to. Optional, if empty defaults to all namespaces.") + flag.Parse() + + var namespaces []string + if *namespaceList != "" { + // Remove any whitespace from the namespaces string + space := regexp.MustCompile(`\s+`) + namespaces = strings.Split(space.ReplaceAllString(*namespaceList, ""), ",") + fmt.Printf("controller is scopped to the following namespaces: %s\n", namespaces) + } else { + namespaces = nil // Default to all namespaces if the list is empty + } // This also calls flag.Parse(). cfg := injection.ParseAndGetRESTConfigOrDie() @@ -57,8 +71,7 @@ func main() { cfg.QPS = 2 * cfg.QPS cfg.Burst = 2 * cfg.Burst - flag.Parse() - ctx := injection.WithNamespaceScope(signals.NewContext(), *namespace) + ctx := injection.WithNamespaceScope(signals.NewContext(), "") - sharedmain.MainWithConfig(ctx, "watcher", cfg, taskrun.NewController, pipelinerun.NewController) + sharedmain.MainWithConfig(ctx, "watcher", cfg, taskrun.NewNamespacesScoppedController(namespaces), pipelinerun.NewNamespacesScoppedController(namespaces)) } diff --git a/pkg/reconciler/filter.go b/pkg/reconciler/filter.go new file mode 100644 index 0000000000..6ede7e4b98 --- /dev/null +++ b/pkg/reconciler/filter.go @@ -0,0 +1,80 @@ +/* +Copyright 2024 The Tekton Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "knative.dev/pkg/controller" +) + +// PipelineRunInformerFilterFunc returns a filter function +// for PipelineRuns ensuring PipelineRuns are filtered by list of namespaces membership +func PipelineRunInformerFilterFunc(namespaces []string) func(obj interface{}) bool { + return func(obj interface{}) bool { + // Namespace filter + if len(namespaces) == 0 { + return true + } + if pr, ok := obj.(*v1.PipelineRun); ok { + for _, ns := range namespaces { + if pr.Namespace == ns { + return true + } + } + } + return false + } +} + +// TaskRunInformerFilterFunc returns a filter function +// for TaskRuns ensuring TaskRuns are filtered by list of namespaces membership +func TaskRunInformerFilterFunc(namespaces []string) func(obj interface{}) bool { + return func(obj interface{}) bool { + // Namespace filter + if len(namespaces) == 0 { + return true + } + if tr, ok := obj.(*v1.TaskRun); ok { + for _, ns := range namespaces { + if tr.Namespace == ns { + return true + } + } + } + return false + } +} + +// TaskRunInformerFilterFunc returns a filter function +// for TaskRuns ensuring Ownership by a PipelineRun and filtered by list of namespaces membership and +func TaskRunInformerFilterFuncWithOwnership(namespaces []string) func(obj interface{}) bool { + return func(obj interface{}) bool { + // Ownership filter + if !controller.FilterController(&v1.PipelineRun{})(obj) { + return false + } + // Namespace filter + if len(namespaces) == 0 { + return true + } + if tr, ok := obj.(*v1.TaskRun); ok { + for _, ns := range namespaces { + if tr.Namespace == ns { + return true + } + } + } + return false + } +} diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 21beabbc47..956c4e1676 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -20,7 +20,7 @@ import ( "github.com/tektoncd/chains/pkg/chains/storage" "github.com/tektoncd/chains/pkg/config" "github.com/tektoncd/chains/pkg/pipelinerunmetrics" - v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "github.com/tektoncd/chains/pkg/reconciler" pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client" pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/pipelinerun" taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/taskrun" @@ -34,61 +34,68 @@ import ( _ "github.com/tektoncd/chains/pkg/chains/formats/all" ) -func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { - logger := logging.FromContext(ctx) - pipelineRunInformer := pipelineruninformer.Get(ctx) - taskRunInformer := taskruninformer.Get(ctx) +// NewNamespacesScoppedController returns a new controller implementation where informer is filtered +// given a list of namespaces +func NewNamespacesScoppedController(namespaces []string) func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + logger := logging.FromContext(ctx) + pipelineRunInformer := pipelineruninformer.Get(ctx) + taskRunInformer := taskruninformer.Get(ctx) - kubeClient := kubeclient.Get(ctx) - pipelineClient := pipelineclient.Get(ctx) + kubeClient := kubeclient.Get(ctx) + pipelineClient := pipelineclient.Get(ctx) - psSigner := &chains.ObjectSigner{ - SecretPath: SecretPath, - Pipelineclientset: pipelineClient, - Recorder: pipelinerunmetrics.Get(ctx), - } + psSigner := &chains.ObjectSigner{ + SecretPath: SecretPath, + Pipelineclientset: pipelineClient, + Recorder: pipelinerunmetrics.Get(ctx), + } - c := &Reconciler{ - PipelineRunSigner: psSigner, - Pipelineclientset: pipelineClient, - TaskRunLister: taskRunInformer.Lister(), - } - impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { - cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) { - // get updated config - cfg := *value.(*config.Config) + c := &Reconciler{ + PipelineRunSigner: psSigner, + Pipelineclientset: pipelineClient, + TaskRunLister: taskRunInformer.Lister(), + } + impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) { + // get updated config + cfg := *value.(*config.Config) + + // get all backends for storing provenance + backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg) + if err != nil { + logger.Error(err) + } + psSigner.Backends = backends + }) - // get all backends for storing provenance - backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg) - if err != nil { - logger.Error(err) + // setup watches for the config names provided by client + cfgStore.WatchConfigs(cmw) + + return controller.Options{ + // The chains reconciler shouldn't mutate the pipelinerun's status. + SkipStatusUpdates: true, + ConfigStore: cfgStore, + FinalizerName: "chains.tekton.dev/pipelinerun", // TODO: unique name required? } - psSigner.Backends = backends }) - // setup watches for the config names provided by client - cfgStore.WatchConfigs(cmw) + c.Tracker = impl.Tracker - return controller.Options{ - // The chains reconciler shouldn't mutate the pipelinerun's status. - SkipStatusUpdates: true, - ConfigStore: cfgStore, - FinalizerName: "chains.tekton.dev/pipelinerun", // TODO: unique name required? + if _, err := pipelineRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: reconciler.PipelineRunInformerFilterFunc(namespaces), + Handler: controller.HandleAll(impl.Enqueue), + }); err != nil { + logger.Errorf("adding event handler for pipelinerun controller's pipelinerun informer encountered error: %v", err) } - }) - - c.Tracker = impl.Tracker - if _, err := pipelineRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil { - logger.Errorf("adding event handler for pipelinerun controller's pipelinerun informer encountered error: %w", err) - } + if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: reconciler.TaskRunInformerFilterFuncWithOwnership(namespaces), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }); err != nil { + logger.Errorf("adding event handler for pipelinerun controller's taskrun informer encountered error: %v", err) + } - if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterController(&v1.PipelineRun{}), - Handler: controller.HandleAll(impl.EnqueueControllerOf), - }); err != nil { - logger.Errorf("adding event handler for pipelinerun controller's taskrun informer encountered error: %w", err) + return impl } - - return impl } diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 2d7cf60a05..6a35676e9d 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -78,7 +78,8 @@ func TestReconciler_Reconcile(t *testing.T) { Name: config.ChainsConfig, }, }) - ctl := NewController(ctx, configMapWatcher) + namespacedScopedController := NewNamespacesScoppedController(nil) + ctl := namespacedScopedController(ctx, configMapWatcher) if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok { if err := la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}); err != nil { diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 8695dcfdb3..a0bef4edae 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -19,10 +19,12 @@ import ( "github.com/tektoncd/chains/pkg/chains" "github.com/tektoncd/chains/pkg/chains/storage" "github.com/tektoncd/chains/pkg/config" + "github.com/tektoncd/chains/pkg/reconciler" "github.com/tektoncd/chains/pkg/taskrunmetrics" pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client" taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/taskrun" taskrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1/taskrun" + "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -31,50 +33,57 @@ import ( _ "github.com/tektoncd/chains/pkg/chains/formats/all" ) -func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { - logger := logging.FromContext(ctx) - taskRunInformer := taskruninformer.Get(ctx) +// NewNamespacesScoppedController returns a new controller implementation where informer is filtered +// given a list of namespaces +func NewNamespacesScoppedController(namespaces []string) func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { + logger := logging.FromContext(ctx) + taskRunInformer := taskruninformer.Get(ctx) - kubeClient := kubeclient.Get(ctx) - pipelineClient := pipelineclient.Get(ctx) + kubeClient := kubeclient.Get(ctx) + pipelineClient := pipelineclient.Get(ctx) - tsSigner := &chains.ObjectSigner{ - SecretPath: SecretPath, - Pipelineclientset: pipelineClient, - Recorder: taskrunmetrics.Get(ctx), - } + tsSigner := &chains.ObjectSigner{ + SecretPath: SecretPath, + Pipelineclientset: pipelineClient, + Recorder: taskrunmetrics.Get(ctx), + } - c := &Reconciler{ - TaskRunSigner: tsSigner, - Pipelineclientset: pipelineClient, - } - impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { - cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) { - // get updated config - cfg := *value.(*config.Config) + c := &Reconciler{ + TaskRunSigner: tsSigner, + Pipelineclientset: pipelineClient, + } + impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) { + // get updated config + cfg := *value.(*config.Config) - // get all backends for storing provenance - backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg) - if err != nil { - logger.Error(err) + // get all backends for storing provenance + backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg) + if err != nil { + logger.Error(err) + } + tsSigner.Backends = backends + }) + + // setup watches for the config names provided by client + cfgStore.WatchConfigs(cmw) + + return controller.Options{ + // The chains reconciler shouldn't mutate the taskrun's status. + SkipStatusUpdates: true, + ConfigStore: cfgStore, + FinalizerName: "chains.tekton.dev", } - tsSigner.Backends = backends }) - // setup watches for the config names provided by client - cfgStore.WatchConfigs(cmw) - - return controller.Options{ - // The chains reconciler shouldn't mutate the taskrun's status. - SkipStatusUpdates: true, - ConfigStore: cfgStore, - FinalizerName: "chains.tekton.dev", + if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: reconciler.TaskRunInformerFilterFunc(namespaces), + Handler: controller.HandleAll(impl.Enqueue), + }); err != nil { + logger.Errorf("adding event handler for taskrun controller's taskrun informer encountered error: %v", err) } - }) - if _, err := taskRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil { - logger.Errorf("adding event handler for taskrun controller's taskrun informer encountered error: %w", err) + return impl } - - return impl } diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 20ba45023f..5c86dab83f 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -75,7 +75,9 @@ func TestReconciler_Reconcile(t *testing.T) { Name: config.ChainsConfig, }, }) - ctl := NewController(ctx, configMapWatcher) + + namespacedScopedController := NewNamespacesScoppedController(nil) + ctl := namespacedScopedController(ctx, configMapWatcher) if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok { if err := la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}); err != nil {