Skip to content

Commit

Permalink
Support multinamespace informer filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
jkhelil committed Jul 2, 2024
1 parent 4929c41 commit a8479b9
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 88 deletions.
21 changes: 17 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package main

import (
"flag"
"fmt"
"regexp"
"strings"

"github.com/tektoncd/chains/pkg/reconciler/pipelinerun"
"github.com/tektoncd/chains/pkg/reconciler/taskrun"
Expand All @@ -41,7 +44,18 @@ import (

func main() {
flag.IntVar(&controller.DefaultThreadsPerController, "threads-per-controller", controller.DefaultThreadsPerController, "Threads (goroutines) to create per controller")
namespace := flag.String("namespace", "", "Namespace to restrict informer to. Optional, defaults to all namespaces.")
namespaceList := flag.String("namespaces", "", "Comma-separated list of namespaces to restrict informer to. Optional, if empty defaults to all namespaces.")
flag.Parse()

var namespaces []string
if *namespaceList != "" {
// Remove any whitespace from the namespaces string
space := regexp.MustCompile(`\s+`)
namespaces = strings.Split(space.ReplaceAllString(*namespaceList, ""), ",")
fmt.Printf("controller is scopped to the following namespaces: %s\n", namespaces)
} else {
namespaces = nil // Default to all namespaces if the list is empty
}

// This also calls flag.Parse().
cfg := injection.ParseAndGetRESTConfigOrDie()
Expand All @@ -57,8 +71,7 @@ func main() {
cfg.QPS = 2 * cfg.QPS
cfg.Burst = 2 * cfg.Burst

flag.Parse()
ctx := injection.WithNamespaceScope(signals.NewContext(), *namespace)
ctx := injection.WithNamespaceScope(signals.NewContext(), "")

sharedmain.MainWithConfig(ctx, "watcher", cfg, taskrun.NewController, pipelinerun.NewController)
sharedmain.MainWithConfig(ctx, "watcher", cfg, taskrun.NewNamespacesScoppedController(namespaces), pipelinerun.NewNamespacesScoppedController(namespaces))
}
80 changes: 80 additions & 0 deletions pkg/reconciler/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright 2024 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

import (
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"knative.dev/pkg/controller"
)

// PipelineRunInformerFilterFunc returns a filter function
// for PipelineRuns ensuring PipelineRuns are filtered by list of namespaces membership
func PipelineRunInformerFilterFunc(namespaces []string) func(obj interface{}) bool {
return func(obj interface{}) bool {
// Namespace filter
if len(namespaces) == 0 {
return true
}
if pr, ok := obj.(*v1.PipelineRun); ok {
for _, ns := range namespaces {
if pr.Namespace == ns {
return true
}
}
}
return false
}
}

// TaskRunInformerFilterFunc returns a filter function
// for TaskRuns ensuring TaskRuns are filtered by list of namespaces membership
func TaskRunInformerFilterFunc(namespaces []string) func(obj interface{}) bool {
return func(obj interface{}) bool {
// Namespace filter
if len(namespaces) == 0 {
return true
}
if tr, ok := obj.(*v1.TaskRun); ok {
for _, ns := range namespaces {
if tr.Namespace == ns {
return true
}
}
}
return false
}
}

// TaskRunInformerFilterFunc returns a filter function

Check warning on line 59 in pkg/reconciler/filter.go

View workflow job for this annotation

GitHub Actions / lint

exported: comment on exported function TaskRunInformerFilterFuncWithOwnership should be of the form "TaskRunInformerFilterFuncWithOwnership ..." (revive)
// for TaskRuns ensuring Ownership by a PipelineRun and filtered by list of namespaces membership and
func TaskRunInformerFilterFuncWithOwnership(namespaces []string) func(obj interface{}) bool {
return func(obj interface{}) bool {
// Ownership filter
if !controller.FilterController(&v1.PipelineRun{})(obj) {
return false
}
// Namespace filter
if len(namespaces) == 0 {
return true
}
if tr, ok := obj.(*v1.TaskRun); ok {
for _, ns := range namespaces {
if tr.Namespace == ns {
return true
}
}
}
return false
}
}
99 changes: 53 additions & 46 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/tektoncd/chains/pkg/chains/storage"
"github.com/tektoncd/chains/pkg/config"
"github.com/tektoncd/chains/pkg/pipelinerunmetrics"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/chains/pkg/reconciler"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/pipelinerun"
taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/taskrun"
Expand All @@ -34,61 +34,68 @@ import (
_ "github.com/tektoncd/chains/pkg/chains/formats/all"
)

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
pipelineRunInformer := pipelineruninformer.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)
// NewNamespacesScoppedController returns a new controller implementation where informer is filtered
// given a list of namespaces
func NewNamespacesScoppedController(namespaces []string) func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
pipelineRunInformer := pipelineruninformer.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)

kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)
kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)

psSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: pipelinerunmetrics.Get(ctx),
}
psSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: pipelinerunmetrics.Get(ctx),
}

c := &Reconciler{
PipelineRunSigner: psSigner,
Pipelineclientset: pipelineClient,
TaskRunLister: taskRunInformer.Lister(),
}
impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {
// get updated config
cfg := *value.(*config.Config)
c := &Reconciler{
PipelineRunSigner: psSigner,
Pipelineclientset: pipelineClient,
TaskRunLister: taskRunInformer.Lister(),
}
impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {

Check warning on line 59 in pkg/reconciler/pipelinerun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'impl' seems to be unused, consider removing or renaming it as _ (revive)
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {

Check warning on line 60 in pkg/reconciler/pipelinerun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'name' seems to be unused, consider removing or renaming it as _ (revive)
// get updated config
cfg := *value.(*config.Config)

// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
}
psSigner.Backends = backends
})

// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)

return controller.Options{
// The chains reconciler shouldn't mutate the pipelinerun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev/pipelinerun", // TODO: unique name required?
}
psSigner.Backends = backends
})

// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)
c.Tracker = impl.Tracker

return controller.Options{
// The chains reconciler shouldn't mutate the pipelinerun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev/pipelinerun", // TODO: unique name required?
if _, err := pipelineRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: reconciler.PipelineRunInformerFilterFunc(namespaces),
Handler: controller.HandleAll(impl.Enqueue),
}); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's pipelinerun informer encountered error: %v", err)
}
})

c.Tracker = impl.Tracker

if _, err := pipelineRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's pipelinerun informer encountered error: %w", err)
}
if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: reconciler.TaskRunInformerFilterFuncWithOwnership(namespaces),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
}); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's taskrun informer encountered error: %v", err)
}

if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&v1.PipelineRun{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
}); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's taskrun informer encountered error: %w", err)
return impl
}

return impl
}
3 changes: 2 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func TestReconciler_Reconcile(t *testing.T) {
Name: config.ChainsConfig,
},
})
ctl := NewController(ctx, configMapWatcher)
namespacedScopedController := NewNamespacesScoppedController(nil)
ctl := namespacedScopedController(ctx, configMapWatcher)

if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
if err := la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}); err != nil {
Expand Down
81 changes: 45 additions & 36 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"github.com/tektoncd/chains/pkg/chains"
"github.com/tektoncd/chains/pkg/chains/storage"
"github.com/tektoncd/chains/pkg/config"
"github.com/tektoncd/chains/pkg/reconciler"
"github.com/tektoncd/chains/pkg/taskrunmetrics"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/taskrun"
taskrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1/taskrun"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand All @@ -31,50 +33,57 @@ import (
_ "github.com/tektoncd/chains/pkg/chains/formats/all"
)

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
taskRunInformer := taskruninformer.Get(ctx)
// NewNamespacesScoppedController returns a new controller implementation where informer is filtered
// given a list of namespaces
func NewNamespacesScoppedController(namespaces []string) func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
taskRunInformer := taskruninformer.Get(ctx)

kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)
kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)

tsSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: taskrunmetrics.Get(ctx),
}
tsSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: taskrunmetrics.Get(ctx),
}

c := &Reconciler{
TaskRunSigner: tsSigner,
Pipelineclientset: pipelineClient,
}
impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {
// get updated config
cfg := *value.(*config.Config)
c := &Reconciler{
TaskRunSigner: tsSigner,
Pipelineclientset: pipelineClient,
}
impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {

Check warning on line 56 in pkg/reconciler/taskrun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'impl' seems to be unused, consider removing or renaming it as _ (revive)
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {

Check warning on line 57 in pkg/reconciler/taskrun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'name' seems to be unused, consider removing or renaming it as _ (revive)
// get updated config
cfg := *value.(*config.Config)

// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
}
tsSigner.Backends = backends
})

// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)

return controller.Options{
// The chains reconciler shouldn't mutate the taskrun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev",
}
tsSigner.Backends = backends
})

// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)

return controller.Options{
// The chains reconciler shouldn't mutate the taskrun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev",
if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: reconciler.TaskRunInformerFilterFunc(namespaces),
Handler: controller.HandleAll(impl.Enqueue),
}); err != nil {
logger.Errorf("adding event handler for taskrun controller's taskrun informer encountered error: %v", err)
}
})

if _, err := taskRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logger.Errorf("adding event handler for taskrun controller's taskrun informer encountered error: %w", err)
return impl
}

return impl
}
4 changes: 3 additions & 1 deletion pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func TestReconciler_Reconcile(t *testing.T) {
Name: config.ChainsConfig,
},
})
ctl := NewController(ctx, configMapWatcher)

namespacedScopedController := NewNamespacesScoppedController(nil)
ctl := namespacedScopedController(ctx, configMapWatcher)

if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
if err := la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}); err != nil {
Expand Down

0 comments on commit a8479b9

Please sign in to comment.