Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow global resync & reconcilerImpl informers to be filtered via filterfunc #1909

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions client/injection/kube/reconciler/core/v1/namespace/controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions codegen/cmd/injection-gen/generators/reconciler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (g *reconcilerControllerGenerator) GenerateType(c *generator.Context, t *ty
Package: "knative.dev/pkg/controller",
Name: "OptionsFn",
}),
"controllerGetFilterFunc": c.Universe.Type(types.Name{
Package: "knative.dev/pkg/controller",
Name: "GetFilterFunc",
}),
"contextContext": c.Universe.Type(types.Name{
Package: "context",
Name: "Context",
Expand Down Expand Up @@ -203,6 +207,7 @@ func NewImpl(ctx {{.contextContext|raw}}, r Interface{{if .hasClass}}, classValu

lister := {{.type|lowercaseSingular}}Informer.Lister()

filterFunc := {{.controllerGetFilterFunc|raw}}(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather see this come through the controller.Options, but the ordering and capture make this awkward.

I think that if you have filterFunc := noFilter up here, and optionally overwrite it below the capture should work out correctly though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact the rest of the things already seem to be doing this, so you are pretty close 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what @benmoss did in #1909 too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2170 rather 😄 . Just clicked your link and momentarily was very confused

rec := &reconcilerImpl{
LeaderAwareFuncs: {{.reconcilerLeaderAwareFuncs|raw}}{
PromoteFunc: func(bkt {{.reconcilerBucket|raw}}, enq func({{.reconcilerBucket|raw}}, {{.typesNamespacedName|raw}})) error {
Expand All @@ -211,11 +216,12 @@ func NewImpl(ctx {{.contextContext|raw}}, r Interface{{if .hasClass}}, classValu
return err
}
for _, elt := range all {
// TODO: Consider letting users specify a filter in options.
enq(bkt, {{.typesNamespacedName|raw}}{
Namespace: elt.GetNamespace(),
Name: elt.GetName(),
})
if ok := filterFunc(elt); ok {
enq(bkt, {{.typesNamespacedName|raw}}{
Namespace: elt.GetNamespace(),
Name: elt.GetName(),
})
}
}
return nil
},
Expand Down
61 changes: 47 additions & 14 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ var (
// may adjust this process-wide default. For finer control, invoke
// Run on the controller directly.
DefaultThreadsPerController = 2

// alwaysTrue is the default FilterFunc, which allows all objects
// passed through. This is used in the default case for passing all
// objects to the slowlane from the passed sharedInformer
alwaysTrue = func(interface{}) bool { return true }
)

// Reconciler is the interface that controller implementations are expected
Expand Down Expand Up @@ -174,6 +179,8 @@ func FilterWithNameAndNamespace(namespace, name string) func(obj interface{}) bo
}
}

type FilterFunc func(obj interface{}) bool

// Impl is our core controller implementation. It handles queuing and feeding work
// from the queue to an implementation of Reconciler.
type Impl struct {
Expand Down Expand Up @@ -203,15 +210,22 @@ type Impl struct {

// StatsReporter is used to send common controller metrics.
statsReporter StatsReporter

// GlobalResyncFilterFunc is used to filter out objects from
// the shared cache on a global resync. By default and if not
// set by the controller implemenation, it will default to
// allowing every object in the cache.
globalResyncFilterFunc FilterFunc
}

// ControllerOptions encapsulates options for creating a new controller,
// including throttling and stats behavior.
type ControllerOptions struct { //nolint // for backcompat.
WorkQueueName string
Logger *zap.SugaredLogger
Reporter StatsReporter
RateLimiter workqueue.RateLimiter
WorkQueueName string
Logger *zap.SugaredLogger
Reporter StatsReporter
RateLimiter workqueue.RateLimiter
GlobalResyncFilterFunc FilterFunc
}

// NewImpl instantiates an instance of our controller that will feed work to the
Expand All @@ -236,12 +250,16 @@ func NewImplFull(r Reconciler, options ControllerOptions) *Impl {
if options.Reporter == nil {
options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger)
}
if options.GlobalResyncFilterFunc == nil {
options.GlobalResyncFilterFunc = alwaysTrue
}
return &Impl{
Name: options.WorkQueueName,
Reconciler: r,
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: logger,
statsReporter: options.Reporter,
Name: options.WorkQueueName,
Reconciler: r,
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: logger,
statsReporter: options.Reporter,
globalResyncFilterFunc: options.GlobalResyncFilterFunc,
}
}

Expand Down Expand Up @@ -540,15 +558,15 @@ func (c *Impl) handleErr(err error, key types.NamespacedName) {
c.workQueue.Forget(key)
}

// GlobalResync enqueues into the slow lane all objects from the passed SharedInformer
// GlobalResync enqueues (as allowed by the globalResyncFilterFunc) into
// the slow lane objects from the passed SharedInformer
func (c *Impl) GlobalResync(si cache.SharedInformer) {
alwaysTrue := func(interface{}) bool { return true }
c.FilteredGlobalResync(alwaysTrue, si)
c.FilteredGlobalResync(c.globalResyncFilterFunc, si)
}

// FilteredGlobalResync enqueues all objects from the
// FilteredGlobalResync enqueues objects from the
// SharedInformer that pass the filter function in to the slow queue.
func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) {
func (c *Impl) FilteredGlobalResync(f FilterFunc, si cache.SharedInformer) {
if c.workQueue.ShuttingDown() {
return
}
Expand Down Expand Up @@ -712,3 +730,18 @@ func safeKey(key types.NamespacedName) string {
}
return key.String()
}

// This is a filterFunc for leaderelection informer and listers
type filterFuncKey struct{}

func WithFilterFunc(ctx context.Context, filter FilterFunc) context.Context {
return context.WithValue(ctx, filterFuncKey{}, filter)
}

func GetFilterFunc(ctx context.Context) FilterFunc {
value := ctx.Value(filterFuncKey{})
if value == nil {
return alwaysTrue
}
return value.(FilterFunc)
}
50 changes: 50 additions & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,3 +1617,53 @@ func TestGetEventRecorder(t *testing.T) {
t.Error("GetEventRecorder() = nil, wanted non-nil")
}
}

func TestFilteredGlobalResync(t *testing.T) {
tests := []struct {
name string
filterFunc FilterFunc
wantQueue []types.NamespacedName
}{{
name: "do nothing",
filterFunc: func(interface{}) bool { return false },
}, {
name: "always true",
filterFunc: alwaysTrue,
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}, {Namespace: "bar", Name: "foo"}, {Namespace: "fizz", Name: "buzz"}},
}, {
name: "filter namespace foo",
filterFunc: func(obj interface{}) bool {
if mo, ok := obj.(metav1.Object); ok {
if mo.GetNamespace() == "foo" {
return true
}
}
return false
},
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}},
}, {
name: "filter object named foo",
filterFunc: func(obj interface{}) bool {
if mo, ok := obj.(metav1.Object); ok {
if mo.GetName() == "foo" {
return true
}
}
return false
},
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
impl := NewImplFull(&nopReconciler{}, ControllerOptions{WorkQueueName: "FilteredTesting", Logger: TestLogger(t), GlobalResyncFilterFunc: test.filterFunc})
impl.GlobalResync(&fakeInformer{})

impl.WorkQueue().ShutDown()
gotQueue := drainWorkQueue(impl.WorkQueue())

if diff := cmp.Diff(test.wantQueue, gotQueue); diff != "" {
t.Error("unexpected queue (-want +got):", diff)
}
})
}
}