Skip to content

Commit

Permalink
Support multinamespace informer filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
jkhelil committed Jul 1, 2024
1 parent 4929c41 commit 9ec2b69
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 88 deletions.
20 changes: 16 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package main

import (
"flag"
"fmt"
"regexp"
"strings"

"github.com/tektoncd/chains/pkg/reconciler/pipelinerun"
"github.com/tektoncd/chains/pkg/reconciler/taskrun"
Expand All @@ -41,7 +44,17 @@ import (

func main() {
flag.IntVar(&controller.DefaultThreadsPerController, "threads-per-controller", controller.DefaultThreadsPerController, "Threads (goroutines) to create per controller")
namespace := flag.String("namespace", "", "Namespace to restrict informer to. Optional, defaults to all namespaces.")
namespaceList := flag.String("namespaces", "", "Comma-separated list of namespaces to restrict informer to. Optional, if empty defaults to all namespaces.")
flag.Parse()

var namespaces []string
if namespaceList != nil && *namespaceList != "" {
space := regexp.MustCompile(`\s+`)
namespaces = strings.Split(space.ReplaceAllString(*namespaceList, ""), ",")
fmt.Printf("controller is scopped to the following namespaces: %s\n", namespaces)
} else {
namespaces = nil // Default to all namespaces if the list is empty
}

// This also calls flag.Parse().
cfg := injection.ParseAndGetRESTConfigOrDie()
Expand All @@ -57,8 +70,7 @@ func main() {
cfg.QPS = 2 * cfg.QPS
cfg.Burst = 2 * cfg.Burst

flag.Parse()
ctx := injection.WithNamespaceScope(signals.NewContext(), *namespace)
ctx := injection.WithNamespaceScope(signals.NewContext(), "")

sharedmain.MainWithConfig(ctx, "watcher", cfg, taskrun.NewController, pipelinerun.NewController)
sharedmain.MainWithConfig(ctx, "watcher", cfg, taskrun.NewNamespacesScoppedController(namespaces), pipelinerun.NewNamespacesScoppedController(namespaces))
}
67 changes: 67 additions & 0 deletions pkg/reconciler/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package reconciler

import (
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"knative.dev/pkg/controller"
)

// PipelineRunInformerFilterFunc returns a filter function
// for PipelineRuns ensuring PipelineRuns are filtered by list of namespaces membership
func PipelineRunInformerFilterFunc(namespaces []string) func(obj interface{}) bool {
return func(obj interface{}) bool {
// Namespace filter
if len(namespaces) == 0 {
return true
}
if pr, ok := obj.(*v1.PipelineRun); ok {
for _, ns := range namespaces {
if pr.Namespace == ns {
return true
}
}
}
return false
}
}

// TaskRunInformerFilterFunc returns a filter function
// for TaskRuns ensuring TaskRuns are filtered by list of namespaces membership
func TaskRunInformerFilterFunc(namespaces []string) func(obj interface{}) bool {
return func(obj interface{}) bool {
// Namespace filter
if len(namespaces) == 0 {
return true
}
if tr, ok := obj.(*v1.TaskRun); ok {
for _, ns := range namespaces {
if tr.Namespace == ns {
return true
}
}
}
return false
}
}

// TaskRunInformerFilterFunc returns a filter function

Check warning on line 46 in pkg/reconciler/filter.go

View workflow job for this annotation

GitHub Actions / lint

exported: comment on exported function TaskRunInformerFilterFuncWithOwnership should be of the form "TaskRunInformerFilterFuncWithOwnership ..." (revive)
// for TaskRuns ensuring Ownership by a PipelineRun and filtered by list of namespaces membership and
func TaskRunInformerFilterFuncWithOwnership(namespaces []string) func(obj interface{}) bool {
return func(obj interface{}) bool {
// Ownership filter
if !controller.FilterController(&v1.PipelineRun{})(obj) {
return false
}
// Namespace filter
if len(namespaces) == 0 {
return true
}
if tr, ok := obj.(*v1.TaskRun); ok {
for _, ns := range namespaces {
if tr.Namespace == ns {
return true
}
}
}
return false
}
}
97 changes: 51 additions & 46 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/tektoncd/chains/pkg/chains/storage"
"github.com/tektoncd/chains/pkg/config"
"github.com/tektoncd/chains/pkg/pipelinerunmetrics"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/chains/pkg/reconciler"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/pipelinerun"
taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/taskrun"
Expand All @@ -34,61 +34,66 @@ import (
_ "github.com/tektoncd/chains/pkg/chains/formats/all"
)

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
pipelineRunInformer := pipelineruninformer.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)
func NewNamespacesScoppedController(namespaces []string) func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {

Check warning on line 37 in pkg/reconciler/pipelinerun/controller.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported function NewNamespacesScoppedController should have comment or be unexported (revive)
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
pipelineRunInformer := pipelineruninformer.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)

kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)
kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)

psSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: pipelinerunmetrics.Get(ctx),
}
psSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: pipelinerunmetrics.Get(ctx),
}

c := &Reconciler{
PipelineRunSigner: psSigner,
Pipelineclientset: pipelineClient,
TaskRunLister: taskRunInformer.Lister(),
}
impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {
// get updated config
cfg := *value.(*config.Config)
c := &Reconciler{
PipelineRunSigner: psSigner,
Pipelineclientset: pipelineClient,
TaskRunLister: taskRunInformer.Lister(),
}
impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {

Check warning on line 57 in pkg/reconciler/pipelinerun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'impl' seems to be unused, consider removing or renaming it as _ (revive)
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {

Check warning on line 58 in pkg/reconciler/pipelinerun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'name' seems to be unused, consider removing or renaming it as _ (revive)
// get updated config
cfg := *value.(*config.Config)

// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
}
psSigner.Backends = backends
})

// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)

return controller.Options{
// The chains reconciler shouldn't mutate the pipelinerun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev/pipelinerun", // TODO: unique name required?
}
psSigner.Backends = backends
})

// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)
c.Tracker = impl.Tracker

return controller.Options{
// The chains reconciler shouldn't mutate the pipelinerun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev/pipelinerun", // TODO: unique name required?
if _, err := pipelineRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: reconciler.PipelineRunInformerFilterFunc(namespaces),
Handler: controller.HandleAll(impl.Enqueue),
}); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's pipelinerun informer encountered error: %v", err)
}
})

c.Tracker = impl.Tracker

if _, err := pipelineRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's pipelinerun informer encountered error: %w", err)
}
if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: reconciler.TaskRunInformerFilterFuncWithOwnership(namespaces),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
}); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's taskrun informer encountered error: %v", err)
}

if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&v1.PipelineRun{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
}); err != nil {
logger.Errorf("adding event handler for pipelinerun controller's taskrun informer encountered error: %w", err)
return impl
}

return impl
}
3 changes: 2 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func TestReconciler_Reconcile(t *testing.T) {
Name: config.ChainsConfig,
},
})
ctl := NewController(ctx, configMapWatcher)
namespacedScopedController := NewNamespacesScoppedController(nil)
ctl := namespacedScopedController(ctx, configMapWatcher)

if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
if err := la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}); err != nil {
Expand Down
79 changes: 43 additions & 36 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"github.com/tektoncd/chains/pkg/chains"
"github.com/tektoncd/chains/pkg/chains/storage"
"github.com/tektoncd/chains/pkg/config"
"github.com/tektoncd/chains/pkg/reconciler"
"github.com/tektoncd/chains/pkg/taskrunmetrics"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1/taskrun"
taskrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1/taskrun"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand All @@ -31,50 +33,55 @@ import (
_ "github.com/tektoncd/chains/pkg/chains/formats/all"
)

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
taskRunInformer := taskruninformer.Get(ctx)
func NewNamespacesScoppedController(namespaces []string) func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {

Check warning on line 36 in pkg/reconciler/taskrun/controller.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported function NewNamespacesScoppedController should have comment or be unexported (revive)
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
taskRunInformer := taskruninformer.Get(ctx)

kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)
kubeClient := kubeclient.Get(ctx)
pipelineClient := pipelineclient.Get(ctx)

tsSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: taskrunmetrics.Get(ctx),
}
tsSigner := &chains.ObjectSigner{
SecretPath: SecretPath,
Pipelineclientset: pipelineClient,
Recorder: taskrunmetrics.Get(ctx),
}

c := &Reconciler{
TaskRunSigner: tsSigner,
Pipelineclientset: pipelineClient,
}
impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {
// get updated config
cfg := *value.(*config.Config)
c := &Reconciler{
TaskRunSigner: tsSigner,
Pipelineclientset: pipelineClient,
}
impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {

Check warning on line 54 in pkg/reconciler/taskrun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'impl' seems to be unused, consider removing or renaming it as _ (revive)
cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) {

Check warning on line 55 in pkg/reconciler/taskrun/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'name' seems to be unused, consider removing or renaming it as _ (revive)
// get updated config
cfg := *value.(*config.Config)

// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
// get all backends for storing provenance
backends, err := storage.InitializeBackends(ctx, pipelineClient, kubeClient, cfg)
if err != nil {
logger.Error(err)
}
tsSigner.Backends = backends
})

// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)

return controller.Options{
// The chains reconciler shouldn't mutate the taskrun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev",
}
tsSigner.Backends = backends
})

// setup watches for the config names provided by client
cfgStore.WatchConfigs(cmw)

return controller.Options{
// The chains reconciler shouldn't mutate the taskrun's status.
SkipStatusUpdates: true,
ConfigStore: cfgStore,
FinalizerName: "chains.tekton.dev",
if _, err := taskRunInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: reconciler.TaskRunInformerFilterFunc(namespaces),
Handler: controller.HandleAll(impl.Enqueue),
}); err != nil {
logger.Errorf("adding event handler for taskrun controller's taskrun informer encountered error: %v", err)
}
})

if _, err := taskRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logger.Errorf("adding event handler for taskrun controller's taskrun informer encountered error: %w", err)
return impl
}

return impl
}
4 changes: 3 additions & 1 deletion pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func TestReconciler_Reconcile(t *testing.T) {
Name: config.ChainsConfig,
},
})
ctl := NewController(ctx, configMapWatcher)

namespacedScopedController := NewNamespacesScoppedController(nil)
ctl := namespacedScopedController(ctx, configMapWatcher)

if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
if err := la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {}); err != nil {
Expand Down

0 comments on commit 9ec2b69

Please sign in to comment.