From 2b82a3a38ce60695ebd5e01b8ab911b67fa9ee61 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Mon, 16 Nov 2020 15:37:31 -0500 Subject: [PATCH 1/6] Allow global resync & reconcilerImpl informers to be filtered via filterfunc --- .../v1/customresourcedefinition/controller.go | 12 +++-- .../customresourcedefinition/controller.go | 12 +++-- .../apps/v1/deployment/controller.go | 12 +++-- .../core/v1/namespace/controller.go | 12 +++-- .../generators/reconciler_controller.go | 16 ++++-- controller/controller.go | 53 ++++++++++++++----- 6 files changed, 79 insertions(+), 38 deletions(-) diff --git a/client/injection/apiextensions/reconciler/apiextensions/v1/customresourcedefinition/controller.go b/client/injection/apiextensions/reconciler/apiextensions/v1/customresourcedefinition/controller.go index fc4badc925..6720008e0e 100644 --- a/client/injection/apiextensions/reconciler/apiextensions/v1/customresourcedefinition/controller.go +++ b/client/injection/apiextensions/reconciler/apiextensions/v1/customresourcedefinition/controller.go @@ -61,6 +61,7 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF lister := customresourcedefinitionInformer.Lister() + filterFunc := controller.GetFilterFunc(ctx) rec := &reconcilerImpl{ LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { @@ -69,11 +70,12 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF return err } for _, elt := range all { - // TODO: Consider letting users specify a filter in options. - enq(bkt, types.NamespacedName{ - Namespace: elt.GetNamespace(), - Name: elt.GetName(), - }) + if ok := filterFunc(elt); ok { + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } } return nil }, diff --git a/client/injection/apiextensions/reconciler/apiextensions/v1beta1/customresourcedefinition/controller.go b/client/injection/apiextensions/reconciler/apiextensions/v1beta1/customresourcedefinition/controller.go index 53bd40bb5e..b7891d14bc 100644 --- a/client/injection/apiextensions/reconciler/apiextensions/v1beta1/customresourcedefinition/controller.go +++ b/client/injection/apiextensions/reconciler/apiextensions/v1beta1/customresourcedefinition/controller.go @@ -61,6 +61,7 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF lister := customresourcedefinitionInformer.Lister() + filterFunc := controller.GetFilterFunc(ctx) rec := &reconcilerImpl{ LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { @@ -69,11 +70,12 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF return err } for _, elt := range all { - // TODO: Consider letting users specify a filter in options. - enq(bkt, types.NamespacedName{ - Namespace: elt.GetNamespace(), - Name: elt.GetName(), - }) + if ok := filterFunc(elt); ok { + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } } return nil }, diff --git a/client/injection/kube/reconciler/apps/v1/deployment/controller.go b/client/injection/kube/reconciler/apps/v1/deployment/controller.go index dff3972620..a2296d4640 100644 --- a/client/injection/kube/reconciler/apps/v1/deployment/controller.go +++ b/client/injection/kube/reconciler/apps/v1/deployment/controller.go @@ -59,6 +59,7 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF lister := deploymentInformer.Lister() + filterFunc := controller.GetFilterFunc(ctx) rec := &reconcilerImpl{ LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { @@ -67,11 +68,12 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF return err } for _, elt := range all { - // TODO: Consider letting users specify a filter in options. - enq(bkt, types.NamespacedName{ - Namespace: elt.GetNamespace(), - Name: elt.GetName(), - }) + if ok := filterFunc(elt); ok { + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } } return nil }, diff --git a/client/injection/kube/reconciler/core/v1/namespace/controller.go b/client/injection/kube/reconciler/core/v1/namespace/controller.go index 065f146341..71a531aa6a 100644 --- a/client/injection/kube/reconciler/core/v1/namespace/controller.go +++ b/client/injection/kube/reconciler/core/v1/namespace/controller.go @@ -59,6 +59,7 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF lister := namespaceInformer.Lister() + filterFunc := controller.GetFilterFunc(ctx) rec := &reconcilerImpl{ LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { @@ -67,11 +68,12 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF return err } for _, elt := range all { - // TODO: Consider letting users specify a filter in options. - enq(bkt, types.NamespacedName{ - Namespace: elt.GetNamespace(), - Name: elt.GetName(), - }) + if ok := filterFunc(elt); ok { + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } } return nil }, diff --git a/codegen/cmd/injection-gen/generators/reconciler_controller.go b/codegen/cmd/injection-gen/generators/reconciler_controller.go index 9260fe87fa..9aac780a21 100644 --- a/codegen/cmd/injection-gen/generators/reconciler_controller.go +++ b/codegen/cmd/injection-gen/generators/reconciler_controller.go @@ -138,6 +138,10 @@ func (g *reconcilerControllerGenerator) GenerateType(c *generator.Context, t *ty Package: "knative.dev/pkg/controller", Name: "OptionsFn", }), + "controllerGetFilterFunc": c.Universe.Type(types.Name{ + Package: "knative.dev/pkg/controller", + Name: "GetFilterFunc", + }), "contextContext": c.Universe.Type(types.Name{ Package: "context", Name: "Context", @@ -203,6 +207,7 @@ func NewImpl(ctx {{.contextContext|raw}}, r Interface{{if .hasClass}}, classValu lister := {{.type|lowercaseSingular}}Informer.Lister() + filterFunc := {{.controllerGetFilterFunc|raw}}(ctx) rec := &reconcilerImpl{ LeaderAwareFuncs: {{.reconcilerLeaderAwareFuncs|raw}}{ PromoteFunc: func(bkt {{.reconcilerBucket|raw}}, enq func({{.reconcilerBucket|raw}}, {{.typesNamespacedName|raw}})) error { @@ -211,11 +216,12 @@ func NewImpl(ctx {{.contextContext|raw}}, r Interface{{if .hasClass}}, classValu return err } for _, elt := range all { - // TODO: Consider letting users specify a filter in options. - enq(bkt, {{.typesNamespacedName|raw}}{ - Namespace: elt.GetNamespace(), - Name: elt.GetName(), - }) + if ok := filterFunc(elt); ok { + enq(bkt, {{.typesNamespacedName|raw}}{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } } return nil }, diff --git a/controller/controller.go b/controller/controller.go index b0ecb2e9be..b5db729e97 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -174,6 +174,8 @@ func FilterWithNameAndNamespace(namespace, name string) func(obj interface{}) bo } } +type filterFunc func(obj interface{}) bool + // Impl is our core controller implementation. It handles queuing and feeding work // from the queue to an implementation of Reconciler. type Impl struct { @@ -203,15 +205,22 @@ type Impl struct { // StatsReporter is used to send common controller metrics. statsReporter StatsReporter + + // GlobalResyncFilterFunc is used to filter our objects from + // the shared cache on a global resync, be default and if not + // set by the controller implemenation, it will default to + // allowing every object in the cache + GlobalResyncFilterFunc filterFunc } // ControllerOptions encapsulates options for creating a new controller, // including throttling and stats behavior. type ControllerOptions struct { //nolint // for backcompat. - WorkQueueName string - Logger *zap.SugaredLogger - Reporter StatsReporter - RateLimiter workqueue.RateLimiter + WorkQueueName string + Logger *zap.SugaredLogger + Reporter StatsReporter + RateLimiter workqueue.RateLimiter + GlobalResyncFilterFunc filterFunc } // NewImpl instantiates an instance of our controller that will feed work to the @@ -236,12 +245,16 @@ func NewImplFull(r Reconciler, options ControllerOptions) *Impl { if options.Reporter == nil { options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger) } + if options.GlobalResyncFilterFunc == nil { + options.GlobalResyncFilterFunc = func(obj interface{}) bool { return true } + } return &Impl{ - Name: options.WorkQueueName, - Reconciler: r, - workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter), - logger: logger, - statsReporter: options.Reporter, + Name: options.WorkQueueName, + Reconciler: r, + workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter), + logger: logger, + statsReporter: options.Reporter, + GlobalResyncFilterFunc: options.GlobalResyncFilterFunc, } } @@ -542,19 +555,18 @@ func (c *Impl) handleErr(err error, key types.NamespacedName) { // GlobalResync enqueues into the slow lane all objects from the passed SharedInformer func (c *Impl) GlobalResync(si cache.SharedInformer) { - alwaysTrue := func(interface{}) bool { return true } - c.FilteredGlobalResync(alwaysTrue, si) + c.FilteredGlobalResync(c.GlobalResyncFilterFunc, si) } // FilteredGlobalResync enqueues all objects from the // SharedInformer that pass the filter function in to the slow queue. -func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) { +func (c *Impl) FilteredGlobalResync(_ func(interface{}) bool, si cache.SharedInformer) { if c.workQueue.ShuttingDown() { return } list := si.GetStore().List() for _, obj := range list { - if f(obj) { + if c.GlobalResyncFilterFunc(obj) { c.EnqueueSlow(obj) } } @@ -712,3 +724,18 @@ func safeKey(key types.NamespacedName) string { } return key.String() } + +// This is a filterFunc for leaderelection informer and listers +type filterFuncKey struct{} + +func WithFilterFunc(ctx context.Context, filter filterFunc) context.Context { + return context.WithValue(ctx, filterFuncKey{}, filter) +} + +func GetFilterFunc(ctx context.Context) filterFunc { + value := ctx.Value(filterFuncKey{}) + if value == nil { + return func(interface{}) bool { return true } + } + return value.(filterFunc) +} From 99fd07578d8ec7f74fef3ae9421d38e4eb7b7e99 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 24 Nov 2020 11:18:22 -0500 Subject: [PATCH 2/6] feedback: change lambda to a fixed function --- controller/controller.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index b5db729e97..d92f59d035 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -56,6 +56,11 @@ var ( // may adjust this process-wide default. For finer control, invoke // Run on the controller directly. DefaultThreadsPerController = 2 + + // alwaysTrue is the default filterFunc, which allows all objects + // passed through this is used in the default case for passing all + // objects to the slowlane from the passed sharedInformer + alwaysTrue = func(interface{}) bool { return true } ) // Reconciler is the interface that controller implementations are expected @@ -246,7 +251,7 @@ func NewImplFull(r Reconciler, options ControllerOptions) *Impl { options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger) } if options.GlobalResyncFilterFunc == nil { - options.GlobalResyncFilterFunc = func(obj interface{}) bool { return true } + options.GlobalResyncFilterFunc = alwaysTrue } return &Impl{ Name: options.WorkQueueName, @@ -735,7 +740,7 @@ func WithFilterFunc(ctx context.Context, filter filterFunc) context.Context { func GetFilterFunc(ctx context.Context) filterFunc { value := ctx.Value(filterFuncKey{}) if value == nil { - return func(interface{}) bool { return true } + return alwaysTrue } return value.(filterFunc) } From 7d31cd35495e247ad1274aadb61c4c76d7f0f435 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 24 Nov 2020 11:19:13 -0500 Subject: [PATCH 3/6] feedback: don't export variable, fix GlobalResync Impl.globalResyncFilterFunc -> do not export (as is set by controller) Ensure FilteredGlobalResync actually runs filterFunc over passed objects Fix descriptions/comments Change passing func(interfcae{}) bool, to filterFunc --- controller/controller.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index d92f59d035..d155f17ae5 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -215,7 +215,7 @@ type Impl struct { // the shared cache on a global resync, be default and if not // set by the controller implemenation, it will default to // allowing every object in the cache - GlobalResyncFilterFunc filterFunc + globalResyncFilterFunc filterFunc } // ControllerOptions encapsulates options for creating a new controller, @@ -259,7 +259,7 @@ func NewImplFull(r Reconciler, options ControllerOptions) *Impl { workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter), logger: logger, statsReporter: options.Reporter, - GlobalResyncFilterFunc: options.GlobalResyncFilterFunc, + globalResyncFilterFunc: options.GlobalResyncFilterFunc, } } @@ -558,20 +558,21 @@ func (c *Impl) handleErr(err error, key types.NamespacedName) { c.workQueue.Forget(key) } -// GlobalResync enqueues into the slow lane all objects from the passed SharedInformer +// GlobalResync enqueues (as allowed by the globalResyncFilterFunc) into +// the slow lane objects from the passed SharedInformer func (c *Impl) GlobalResync(si cache.SharedInformer) { - c.FilteredGlobalResync(c.GlobalResyncFilterFunc, si) + c.FilteredGlobalResync(c.globalResyncFilterFunc, si) } -// FilteredGlobalResync enqueues all objects from the +// FilteredGlobalResync enqueues objects from the // SharedInformer that pass the filter function in to the slow queue. -func (c *Impl) FilteredGlobalResync(_ func(interface{}) bool, si cache.SharedInformer) { +func (c *Impl) FilteredGlobalResync(f filterFunc, si cache.SharedInformer) { if c.workQueue.ShuttingDown() { return } list := si.GetStore().List() for _, obj := range list { - if c.GlobalResyncFilterFunc(obj) { + if f(obj) { c.EnqueueSlow(obj) } } From 1439ca57f0ce7749b8b1c4d0c612508378f9b45b Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 24 Nov 2020 14:34:59 -0500 Subject: [PATCH 4/6] Add TestFilteredGlobalResync --- controller/controller_test.go | 54 +++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/controller/controller_test.go b/controller/controller_test.go index 222d7821fb..0d13b4e858 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -1617,3 +1617,57 @@ func TestGetEventRecorder(t *testing.T) { t.Error("GetEventRecorder() = nil, wanted non-nil") } } + +func TestFilteredGlobalResync(t *testing.T) { + tests := []struct { + name string + filterFunc filterFunc + wantQueue []types.NamespacedName + }{{ + name: "do nothing", + filterFunc: func(interface{}) bool { return false }, + }, { + name: "always true", + filterFunc: alwaysTrue, + wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}, {Namespace: "bar", Name: "foo"}, {Namespace: "fizz", Name: "buzz"}}, + }, { + name: "filter namespace foo", + filterFunc: func(obj interface{}) bool { + if mo, ok := obj.(metav1.Object); ok { + if mo.GetNamespace() == "foo" { + return true + } else { + return false + } + } + return false + }, + wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}}, + }, { + name: "filter object named foo", + filterFunc: func(obj interface{}) bool { + if mo, ok := obj.(metav1.Object); ok { + if mo.GetName() == "foo" { + return true + } else { + return false + } + } + return false + }, + wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}}, + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + impl := NewImplFull(&nopReconciler{}, ControllerOptions{WorkQueueName: "FilteredTesting", Logger: TestLogger(t), GlobalResyncFilterFunc: test.filterFunc}) + impl.GlobalResync(&fakeInformer{}) + + impl.WorkQueue().ShutDown() + gotQueue := drainWorkQueue(impl.WorkQueue()) + + if diff := cmp.Diff(test.wantQueue, gotQueue); diff != "" { + t.Error("unexpected queue (-want +got):", diff) + } + }) + } +} From 9e9bd064b0999df59ba9ab195dbbee9553e10efd Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 24 Nov 2020 15:14:12 -0500 Subject: [PATCH 5/6] golint fixes --- controller/controller.go | 16 ++++++++-------- controller/controller_test.go | 6 +----- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index d155f17ae5..76a875d582 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -57,7 +57,7 @@ var ( // Run on the controller directly. DefaultThreadsPerController = 2 - // alwaysTrue is the default filterFunc, which allows all objects + // alwaysTrue is the default FilterFunc, which allows all objects // passed through this is used in the default case for passing all // objects to the slowlane from the passed sharedInformer alwaysTrue = func(interface{}) bool { return true } @@ -179,7 +179,7 @@ func FilterWithNameAndNamespace(namespace, name string) func(obj interface{}) bo } } -type filterFunc func(obj interface{}) bool +type FilterFunc func(obj interface{}) bool // Impl is our core controller implementation. It handles queuing and feeding work // from the queue to an implementation of Reconciler. @@ -215,7 +215,7 @@ type Impl struct { // the shared cache on a global resync, be default and if not // set by the controller implemenation, it will default to // allowing every object in the cache - globalResyncFilterFunc filterFunc + globalResyncFilterFunc FilterFunc } // ControllerOptions encapsulates options for creating a new controller, @@ -225,7 +225,7 @@ type ControllerOptions struct { //nolint // for backcompat. Logger *zap.SugaredLogger Reporter StatsReporter RateLimiter workqueue.RateLimiter - GlobalResyncFilterFunc filterFunc + GlobalResyncFilterFunc FilterFunc } // NewImpl instantiates an instance of our controller that will feed work to the @@ -566,7 +566,7 @@ func (c *Impl) GlobalResync(si cache.SharedInformer) { // FilteredGlobalResync enqueues objects from the // SharedInformer that pass the filter function in to the slow queue. -func (c *Impl) FilteredGlobalResync(f filterFunc, si cache.SharedInformer) { +func (c *Impl) FilteredGlobalResync(f FilterFunc, si cache.SharedInformer) { if c.workQueue.ShuttingDown() { return } @@ -734,14 +734,14 @@ func safeKey(key types.NamespacedName) string { // This is a filterFunc for leaderelection informer and listers type filterFuncKey struct{} -func WithFilterFunc(ctx context.Context, filter filterFunc) context.Context { +func WithFilterFunc(ctx context.Context, filter FilterFunc) context.Context { return context.WithValue(ctx, filterFuncKey{}, filter) } -func GetFilterFunc(ctx context.Context) filterFunc { +func GetFilterFunc(ctx context.Context) FilterFunc { value := ctx.Value(filterFuncKey{}) if value == nil { return alwaysTrue } - return value.(filterFunc) + return value.(FilterFunc) } diff --git a/controller/controller_test.go b/controller/controller_test.go index 0d13b4e858..2d03af403a 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -1621,7 +1621,7 @@ func TestGetEventRecorder(t *testing.T) { func TestFilteredGlobalResync(t *testing.T) { tests := []struct { name string - filterFunc filterFunc + filterFunc FilterFunc wantQueue []types.NamespacedName }{{ name: "do nothing", @@ -1636,8 +1636,6 @@ func TestFilteredGlobalResync(t *testing.T) { if mo, ok := obj.(metav1.Object); ok { if mo.GetNamespace() == "foo" { return true - } else { - return false } } return false @@ -1649,8 +1647,6 @@ func TestFilteredGlobalResync(t *testing.T) { if mo, ok := obj.(metav1.Object); ok { if mo.GetName() == "foo" { return true - } else { - return false } } return false From ca3d2a95b82fc6a734f55d819537aca09013ce7d Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 25 Nov 2020 07:43:54 -0500 Subject: [PATCH 6/6] Fix Typos --- controller/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 76a875d582..0dd94f67e0 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -58,7 +58,7 @@ var ( DefaultThreadsPerController = 2 // alwaysTrue is the default FilterFunc, which allows all objects - // passed through this is used in the default case for passing all + // passed through. This is used in the default case for passing all // objects to the slowlane from the passed sharedInformer alwaysTrue = func(interface{}) bool { return true } ) @@ -211,10 +211,10 @@ type Impl struct { // StatsReporter is used to send common controller metrics. statsReporter StatsReporter - // GlobalResyncFilterFunc is used to filter our objects from - // the shared cache on a global resync, be default and if not + // GlobalResyncFilterFunc is used to filter out objects from + // the shared cache on a global resync. By default and if not // set by the controller implemenation, it will default to - // allowing every object in the cache + // allowing every object in the cache. globalResyncFilterFunc FilterFunc }