Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow global resync & reconcilerImpl informers to be filtered via filterfunc #1909

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions client/injection/kube/reconciler/core/v1/namespace/controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions codegen/cmd/injection-gen/generators/reconciler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (g *reconcilerControllerGenerator) GenerateType(c *generator.Context, t *ty
Package: "knative.dev/pkg/controller",
Name: "OptionsFn",
}),
"controllerGetFilterFunc": c.Universe.Type(types.Name{
Package: "knative.dev/pkg/controller",
Name: "GetFilterFunc",
}),
"contextContext": c.Universe.Type(types.Name{
Package: "context",
Name: "Context",
Expand Down Expand Up @@ -203,6 +207,7 @@ func NewImpl(ctx {{.contextContext|raw}}, r Interface{{if .hasClass}}, classValu

lister := {{.type|lowercaseSingular}}Informer.Lister()

filterFunc := {{.controllerGetFilterFunc|raw}}(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather see this come through the controller.Options, but the ordering and capture make this awkward.

I think that if you have filterFunc := noFilter up here, and optionally overwrite it below the capture should work out correctly though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact the rest of the things already seem to be doing this, so you are pretty close 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what @benmoss did in #1909 too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2170 rather 😄 . Just clicked your link and momentarily was very confused

rec := &reconcilerImpl{
LeaderAwareFuncs: {{.reconcilerLeaderAwareFuncs|raw}}{
PromoteFunc: func(bkt {{.reconcilerBucket|raw}}, enq func({{.reconcilerBucket|raw}}, {{.typesNamespacedName|raw}})) error {
Expand All @@ -211,11 +216,12 @@ func NewImpl(ctx {{.contextContext|raw}}, r Interface{{if .hasClass}}, classValu
return err
}
for _, elt := range all {
// TODO: Consider letting users specify a filter in options.
enq(bkt, {{.typesNamespacedName|raw}}{
Namespace: elt.GetNamespace(),
Name: elt.GetName(),
})
if ok := filterFunc(elt); ok {
enq(bkt, {{.typesNamespacedName|raw}}{
Namespace: elt.GetNamespace(),
Name: elt.GetName(),
})
}
}
return nil
},
Expand Down
53 changes: 40 additions & 13 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func FilterWithNameAndNamespace(namespace, name string) func(obj interface{}) bo
}
}

type filterFunc func(obj interface{}) bool

// Impl is our core controller implementation. It handles queuing and feeding work
// from the queue to an implementation of Reconciler.
type Impl struct {
Expand Down Expand Up @@ -203,15 +205,22 @@ type Impl struct {

// StatsReporter is used to send common controller metrics.
statsReporter StatsReporter

// GlobalResyncFilterFunc is used to filter our objects from
// the shared cache on a global resync, be default and if not
// set by the controller implemenation, it will default to
// allowing every object in the cache
lberk marked this conversation as resolved.
Show resolved Hide resolved
GlobalResyncFilterFunc filterFunc
lberk marked this conversation as resolved.
Show resolved Hide resolved
}

// ControllerOptions encapsulates options for creating a new controller,
// including throttling and stats behavior.
type ControllerOptions struct { //nolint // for backcompat.
WorkQueueName string
Logger *zap.SugaredLogger
Reporter StatsReporter
RateLimiter workqueue.RateLimiter
WorkQueueName string
Logger *zap.SugaredLogger
Reporter StatsReporter
RateLimiter workqueue.RateLimiter
GlobalResyncFilterFunc filterFunc
}

// NewImpl instantiates an instance of our controller that will feed work to the
Expand All @@ -236,12 +245,16 @@ func NewImplFull(r Reconciler, options ControllerOptions) *Impl {
if options.Reporter == nil {
options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger)
}
if options.GlobalResyncFilterFunc == nil {
options.GlobalResyncFilterFunc = func(obj interface{}) bool { return true }
lberk marked this conversation as resolved.
Show resolved Hide resolved
}
return &Impl{
Name: options.WorkQueueName,
Reconciler: r,
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: logger,
statsReporter: options.Reporter,
Name: options.WorkQueueName,
Reconciler: r,
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: logger,
statsReporter: options.Reporter,
GlobalResyncFilterFunc: options.GlobalResyncFilterFunc,
}
}

Expand Down Expand Up @@ -542,19 +555,18 @@ func (c *Impl) handleErr(err error, key types.NamespacedName) {

// GlobalResync enqueues into the slow lane all objects from the passed SharedInformer
func (c *Impl) GlobalResync(si cache.SharedInformer) {
alwaysTrue := func(interface{}) bool { return true }
c.FilteredGlobalResync(alwaysTrue, si)
c.FilteredGlobalResync(c.GlobalResyncFilterFunc, si)
lberk marked this conversation as resolved.
Show resolved Hide resolved
}

// FilteredGlobalResync enqueues all objects from the
// SharedInformer that pass the filter function in to the slow queue.
func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) {
func (c *Impl) FilteredGlobalResync(_ func(interface{}) bool, si cache.SharedInformer) {
lberk marked this conversation as resolved.
Show resolved Hide resolved
lberk marked this conversation as resolved.
Show resolved Hide resolved
if c.workQueue.ShuttingDown() {
return
}
list := si.GetStore().List()
for _, obj := range list {
if f(obj) {
if c.GlobalResyncFilterFunc(obj) {
c.EnqueueSlow(obj)
}
}
Expand Down Expand Up @@ -712,3 +724,18 @@ func safeKey(key types.NamespacedName) string {
}
return key.String()
}

// This is a filterFunc for leaderelection informer and listers
type filterFuncKey struct{}

func WithFilterFunc(ctx context.Context, filter filterFunc) context.Context {
return context.WithValue(ctx, filterFuncKey{}, filter)
}

func GetFilterFunc(ctx context.Context) filterFunc {
value := ctx.Value(filterFuncKey{})
if value == nil {
return func(interface{}) bool { return true }
}
return value.(filterFunc)
}