From faa331d39d9c90b687279dac9a61051391e5ad4b Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Tue, 24 Oct 2023 16:09:20 +0100 Subject: [PATCH] Factor client caches into its own code All the bits to do with client caches can go together, since (after being constructed) it only interacts with the reconciler through `watchTargetAndGetReader` and via events. This is mainly a case of relocating the relevant fields and changing some variable names. Signed-off-by: Michael Bridgen --- controllers/leveltriggered/caching.go | 406 +++++++++++++++++++++++ controllers/leveltriggered/controller.go | 339 +------------------ controllers/leveltriggered/gc_test.go | 24 +- controllers/leveltriggered/indexing.go | 33 -- 4 files changed, 433 insertions(+), 369 deletions(-) create mode 100644 controllers/leveltriggered/caching.go diff --git a/controllers/leveltriggered/caching.go b/controllers/leveltriggered/caching.go new file mode 100644 index 0000000..b8f405d --- /dev/null +++ b/controllers/leveltriggered/caching.go @@ -0,0 +1,406 @@ +package leveltriggered + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-logr/logr" + clusterctrlv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + toolscache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" + capicfg "sigs.k8s.io/cluster-api/util/kubeconfig" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/event" + + "github.com/weaveworks/pipeline-controller/api/v1alpha1" +) + +// caches holds all the values needed for keeping track of client caches, used for 1. querying clusters for arbitrary app objects; and, +// 2. installing watches so that app object updates will trigger reconciliation of the pipelines that use them. +type caches struct { + // used to convey events to the reconciler + events chan event.GenericEvent + + // internal bookkeeping + targetScheme *runtime.Scheme + cachesMap map[clusterAndGVK]cacheAndCancel + cachesMu *sync.Mutex + + // these are constructed when this object is set up with a manager + baseLogger logr.Logger + reader client.Reader + localClusterConfig *rest.Config + cachesGC *gc + runner *runner +} + +func newCaches(events chan event.GenericEvent, targetScheme *runtime.Scheme) *caches { + return &caches{ + targetScheme: targetScheme, + events: events, + cachesMap: make(map[clusterAndGVK]cacheAndCancel), + cachesMu: &sync.Mutex{}, + } +} + +func (c *caches) setupWithManager(mgr ctrl.Manager) error { + c.localClusterConfig = mgr.GetConfig() + c.baseLogger = mgr.GetLogger().WithValues("component", "target-cache") + c.reader = mgr.GetClient() // this specifically gets the client that has the indexing installed below; i.e., these are coupled. + + c.runner = newRunner() + if err := mgr.Add(c.runner); err != nil { + return err + } + + c.cachesGC = newGC(c, mgr.GetLogger().WithValues("component", "target-cache-gc")) + if err := mgr.Add(c.cachesGC); err != nil { + return err + } + + // Index the Pipelines by the cache they require for their targets. + if err := mgr.GetCache().IndexField(context.TODO(), &v1alpha1.Pipeline{}, cacheIndex, indexTargetCache); err != nil { + return fmt.Errorf("failed setting index fields: %w", err) + } + + // Index the Pipelines by the application references they point at. + if err := mgr.GetCache().IndexField(context.TODO(), &v1alpha1.Pipeline{}, applicationKey, indexApplication /* <- both from indexing.go */); err != nil { + return fmt.Errorf("failed setting index fields: %w", err) + } + + return nil +} + +func (c *caches) getCacheKeys() (res []clusterAndGVK) { + c.cachesMu.Lock() + defer c.cachesMu.Unlock() + for k := range c.cachesMap { + res = append(res, k) + } + return res +} + +// clusterAndGVK is used as the key for caches +type clusterAndGVK struct { + client.ObjectKey + schema.GroupVersionKind +} + +func (key clusterAndGVK) String() string { + return key.ObjectKey.String() + ":" + key.GroupVersionKind.String() +} + +type cacheAndCancel struct { + cache cache.Cache + cancel context.CancelFunc +} + +// watchTargetAndGetReader ensures that the type (GroupVersionKind) of the target in the cluster given is being watched, and +// returns a client.Reader. +// A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise +// a client.Reader and a bool which is true if the type was already watched and syncing. +// +// NB: the target object should be the object that is used as the `client.Object` argument for client.Get(...); the cache has different +// stores for {typed values, partial values, unstructured values}, and we want to install the watcher in the same one as we will later query. +func (c *caches) watchTargetAndGetReader(ctx context.Context, clusterObject *clusterctrlv1alpha1.GitopsCluster, target client.Object) (client.Reader, bool, error) { + var clusterKey client.ObjectKey + if clusterObject != nil { + clusterKey = client.ObjectKeyFromObject(clusterObject) + } + + targetGVK, err := apiutil.GVKForObject(target, c.targetScheme) + if err != nil { + return nil, false, err + } + cacheKey := clusterAndGVK{ + ObjectKey: clusterKey, + GroupVersionKind: targetGVK, + } + + logger := c.baseLogger.WithValues("cluster", clusterKey, "type", targetGVK) + + c.cachesMu.Lock() + cacheEntry, cacheFound := c.cachesMap[cacheKey] + c.cachesMu.Unlock() + // To construct a cache, we need a *rest.Config. There's two ways to get one: + // - for the local cluster, we can just get the already prepared one from the Manager; + // - for a remote cluster, we can construct one given a kubeconfig; and the kubeconfig will be stored in a Secret, + // associated with the object representing the cluster. + if !cacheFound { + logger.Info("creating cache for cluster and type") + var cfg *rest.Config + + if clusterObject == nil { + cfg = c.localClusterConfig + } else { + var kubeconfig []byte + var err error + switch { + case clusterObject.Spec.CAPIClusterRef != nil: + var capiKey client.ObjectKey + capiKey.Name = clusterObject.Spec.CAPIClusterRef.Name + capiKey.Namespace = clusterObject.GetNamespace() + kubeconfig, err = capicfg.FromSecret(ctx, c.reader, capiKey) + if err != nil { + return nil, false, err + } + case clusterObject.Spec.SecretRef != nil: + var secretKey client.ObjectKey + secretKey.Name = clusterObject.Spec.SecretRef.Name + secretKey.Namespace = clusterObject.GetNamespace() + + var sec corev1.Secret + if err := c.reader.Get(ctx, secretKey, &sec); err != nil { + return nil, false, err + } + var ok bool + kubeconfig, ok = sec.Data["kubeconfig"] + if !ok { + return nil, false, fmt.Errorf("referenced Secret does not have data key %s", "kubeconfig") + } + default: + return nil, false, fmt.Errorf("GitopsCluster object has neither .secretRef nor .capiClusterRef populated, unable to get remote cluster config") + } + cfg, err = clientcmd.RESTConfigFromKubeConfig(kubeconfig) + if err != nil { + return nil, false, err + } + } + + // having done all that, did we really need it? + c.cachesMu.Lock() + if cacheEntry, cacheFound = c.cachesMap[cacheKey]; !cacheFound { + ca, err := cache.New(cfg, cache.Options{ + Scheme: c.targetScheme, + }) + if err != nil { + c.cachesMu.Unlock() + return nil, false, err + } + + cancel := c.runner.run(func(ctx context.Context) { + if err := ca.Start(ctx); err != nil { + logger.Error(err, "cache exited with error") + } + }) + cacheEntry = cacheAndCancel{ + cache: ca, + cancel: cancel, + } + c.cachesMap[cacheKey] = cacheEntry + } + c.cachesMu.Unlock() + // Add it to the queue for GC consideration + c.cachesGC.register(cacheKey) + } + + // Now we have a cache; make sure the object type in question is being watched, so we can query it and get updates. + + // The informer is retrieved whether we created the cache or not, because we want to know if it's synced and thus ready to be queried. + typeCache := cacheEntry.cache + inf, err := typeCache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment). + if err != nil { + return nil, false, err + } + + if !cacheFound { // meaning: we created the cache, this time around, so we'll need to install the event handler. + enqueuePipelinesForTarget := func(obj interface{}) { + eventObj, ok := obj.(client.Object) + if !ok { + logger.Info("value to look up in index was not a client.Object", "object", obj) + return + } + pipelines, err := c.pipelinesForApplication(clusterKey, eventObj) + if err != nil { + logger.Error(err, "failed to look up pipelines in index of applications") + return + } + // TODO is passing pointers here dangerous? (do they get copied, I think they might do). Alternative is to pass the whole list in the channel. + for i := range pipelines { + c.events <- event.GenericEvent{Object: &pipelines[i]} + } + } + + _, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueuePipelinesForTarget(obj) + }, + DeleteFunc: func(obj interface{}) { + enqueuePipelinesForTarget(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // We're just looking up the name in the index so it'll be the same for the old as for the new object. + // However, this might change elsewhere, so to be defensive, run both. The queue will deduplicate, + // though it means we do a bit more lookup work. + enqueuePipelinesForTarget(oldObj) + enqueuePipelinesForTarget(newObj) + }, + }) + if err != nil { + return nil, false, err + } + } + + return typeCache, inf.HasSynced(), nil +} + +// == target indexing == +// +// Each target in a pipeline is put in an index, so that given an event referring to that target, the pipeline(s) using it can +// be looked up and queued for reconciliation. + +// targetIndexKey produces an index key for the exact target +func targetIndexKey(clusterKey client.ObjectKey, gvk schema.GroupVersionKind, targetKey client.ObjectKey) string { + key := fmt.Sprintf("%s:%s:%s", clusterKey, gvk, targetKey) + return key +} + +// indexApplication extracts all the application refs from a pipeline. The index keys are +// +// /:///`. +var indexApplication = indexTargets(targetIndexKey) + +// pipelinesForApplication is given an application object and its cluster, and looks up the pipeline(s) that use it as a target. +// It assumes applications are indexed using `indexApplication(...)` (or something using the same key format and index). +// `clusterName` can be a zero value, but if it's not, both the namespace and name should be supplied (since the namespace will +// be give a value if there's only a name supplied, when indexing. See `indexApplication()`). +func (c *caches) pipelinesForApplication(clusterName client.ObjectKey, obj client.Object) ([]v1alpha1.Pipeline, error) { + ctx := context.Background() + var list v1alpha1.PipelineList + gvk, err := apiutil.GVKForObject(obj, c.targetScheme) + if err != nil { + return nil, err + } + + key := targetIndexKey(clusterName, gvk, client.ObjectKeyFromObject(obj)) + if err := c.reader.List(ctx, &list, client.MatchingFields{ + applicationKey: key, + }); err != nil { + return nil, err + } + + return list.Items, nil +} + +// == Garbage collection of caches == + +// cacheIndex names the index for keeping track of which pipelines use which caches. +const cacheIndex = "cache" + +// This gives a string representation of a key into the caches map, so it can be used for indexing. +// It has the signature of `targetIndexerFunc` so it can be used with `indexTargets(...)`. +// isCacheUsed below relies on this implementation using clusterAndGVK.String(), so that +// the index keys it uses match what's actually indexed. +func clusterCacheKey(clusterKey client.ObjectKey, gvk schema.GroupVersionKind, _targetKey client.ObjectKey) string { + return (clusterAndGVK{ + ObjectKey: clusterKey, + GroupVersionKind: gvk, + }).String() +} + +// indexTargetCache is an IndexerFunc that returns a key representing the cache a target will come from. This is coupled to +// the scheme for keeping track of caches, as embodied in the `caches` map in the reconciler the method `watchTargetAndGetReader`; +// changing how those work, for instance caching by {cluster, namespace, type} instead, will likely need a change here. +var indexTargetCache = indexTargets(clusterCacheKey) + +// isCacheUsed looks up the given cache key, and returns true if a pipeline uses that cache, and false otherwise; or, +// an error if the query didn't succeed. +func (c *caches) isCacheUsed(key clusterAndGVK) (bool, error) { + var list v1alpha1.PipelineList + if err := c.reader.List(context.TODO(), &list, client.MatchingFields{ + cacheIndex: key.String(), + }); err != nil { + return false, err + } + return len(list.Items) > 0, nil +} + +// removeCache stops the cache identified by `key` running. +func (c *caches) removeCache(key clusterAndGVK) { + c.cachesMu.Lock() + defer c.cachesMu.Unlock() + if entry, ok := c.cachesMap[key]; ok { + entry.cancel() + delete(c.cachesMap, key) + } +} + +type cachesInterface interface { + isCacheUsed(clusterAndGVK) (bool, error) + removeCache(clusterAndGVK) +} + +type gc struct { + caches cachesInterface + queue workqueue.RateLimitingInterface + log logr.Logger +} + +func newGC(caches cachesInterface, logger logr.Logger) *gc { + ratelimiter := workqueue.NewItemExponentialFailureRateLimiter(2*time.Second, 512*time.Second) + queue := workqueue.NewRateLimitingQueueWithConfig(ratelimiter, workqueue.RateLimitingQueueConfig{ + Name: "cache-garbage-collection", + }) + + return &gc{ + caches: caches, + queue: queue, + log: logger, + } +} + +func (gc *gc) register(key clusterAndGVK) { + gc.log.Info("cache key registered for GC", "key", key) + gc.queue.Add(key) // NB not rate limited. Though, this is called when the key is introduced, so it shouldn't matter one way or the other. +} + +func (gc *gc) loop() { + for { + item, shutdown := gc.queue.Get() + if shutdown { + return + } + key, ok := item.(clusterAndGVK) + if !ok { + gc.queue.Forget(item) + gc.queue.Done(item) + } + + if ok, err := gc.caches.isCacheUsed(key); err != nil { + gc.log.Error(err, "calling isCacheUsed", "key", key) + } else if ok { + // still used, requeue for consideration + gc.queue.Done(key) + gc.queue.AddRateLimited(key) + } else { + gc.log.Info("removing unused cache", "key", key, "requeues", gc.queue.NumRequeues(key)) + gc.queue.Forget(key) + gc.queue.Done(key) + gc.caches.removeCache(key) + } + } +} + +func (gc *gc) Start(ctx context.Context) error { + if gc.caches == nil || gc.queue == nil { + return fmt.Errorf("neither of caches nor queue can be nil") + } + // queue.Get blocks until either there's an item, or the queue is shutting down, so we can't put + // it in a select with <-ctx.Done(). Instead, do this bit in a goroutine, and rely on it exiting + // when the queue is shut down. + go gc.loop() + <-ctx.Done() + gc.log.Info("shutting down") + gc.queue.ShutDown() + return nil +} diff --git a/controllers/leveltriggered/controller.go b/controllers/leveltriggered/controller.go index df03a68..1b394d0 100644 --- a/controllers/leveltriggered/controller.go +++ b/controllers/leveltriggered/controller.go @@ -3,10 +3,7 @@ package leveltriggered import ( "context" "fmt" - "sync" - "time" - "github.com/go-logr/logr" clusterctrlv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -16,16 +13,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" - toolscache "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - capicfg "sigs.k8s.io/cluster-api/util/kubeconfig" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" @@ -36,64 +26,37 @@ import ( "github.com/weaveworks/pipeline-controller/server/strategy" ) -// clusterAndGVK is used as the key for caches -type clusterAndGVK struct { - client.ObjectKey - schema.GroupVersionKind -} - -func (key clusterAndGVK) String() string { - return key.ObjectKey.String() + ":" + key.GroupVersionKind.String() -} - -type cacheAndCancel struct { - cache cache.Cache - cancel context.CancelFunc -} - // PipelineReconciler reconciles a Pipeline object type PipelineReconciler struct { client.Client Scheme *runtime.Scheme - targetScheme *runtime.Scheme ControllerName string + caches *caches recorder record.EventRecorder stratReg strategy.StrategyRegistry - caches map[clusterAndGVK]cacheAndCancel - cachesMu *sync.Mutex - cachesGC *gc - runner *runner - manager ctrl.Manager appEvents chan event.GenericEvent } func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string, eventRecorder record.EventRecorder, stratReg strategy.StrategyRegistry) *PipelineReconciler { + appEvents := make(chan event.GenericEvent) + + // this is empty because we're going to use unstructured.Unstructured objects to support arbitrary types. + // If something changed and we wanted typed objects, this scheme would need to have those registered. targetScheme := runtime.NewScheme() pc := &PipelineReconciler{ Client: c, Scheme: s, - targetScheme: targetScheme, recorder: eventRecorder, ControllerName: controllerName, stratReg: stratReg, - caches: make(map[clusterAndGVK]cacheAndCancel), - cachesMu: &sync.Mutex{}, - appEvents: make(chan event.GenericEvent), + caches: newCaches(appEvents, targetScheme), + appEvents: appEvents, } return pc } -func (r *PipelineReconciler) getCacheKeys() (res []clusterAndGVK) { - r.cachesMu.Lock() - defer r.cachesMu.Unlock() - for k := range r.caches { - res = append(res, k) - } - return res -} - //+kubebuilder:rbac:groups=pipelines.weave.works,resources=pipelines,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=pipelines.weave.works,resources=pipelines/status,verbs=get;update;patch //+kubebuilder:rbac:groups=pipelines.weave.works,resources=pipelines/finalizers,verbs=update @@ -176,7 +139,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } // it's OK if clusterObject is still `nil` -- that represents the local cluster. - clusterClient, ok, err := r.watchTargetAndGetReader(ctx, clusterObject, targetObj) + clusterClient, ok, err := r.caches.watchTargetAndGetReader(ctx, clusterObject, targetObj) if err != nil { return ctrl.Result{}, err } @@ -390,155 +353,6 @@ func checkAllTargetsAreReady(env *v1alpha1.EnvironmentStatus) bool { return true } -// watchTargetAndGetReader ensures that the type (GroupVersionKind) of the target in the cluster given is being watched, and -// returns a client.Reader. -// A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise -// a client.Reader and a bool which is true if the type was already watched and syncing. -// -// NB: the target object should be the object that is used as the `client.Object` argument for client.Get(...); the cache has different -// stores for {typed values, partial values, unstructured values}, and we want to install the watcher in the same one as we will later query. -func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, clusterObject *clusterctrlv1alpha1.GitopsCluster, target client.Object) (client.Reader, bool, error) { - var clusterKey client.ObjectKey - if clusterObject != nil { - clusterKey = client.ObjectKeyFromObject(clusterObject) - } - - targetGVK, err := apiutil.GVKForObject(target, r.targetScheme) - if err != nil { - return nil, false, err - } - cacheKey := clusterAndGVK{ - ObjectKey: clusterKey, - GroupVersionKind: targetGVK, - } - - logger := r.manager.GetLogger().WithValues("component", "target-cache", "cluster", clusterKey, "type", targetGVK) - - r.cachesMu.Lock() - cacheEntry, cacheFound := r.caches[cacheKey] - r.cachesMu.Unlock() - // To construct a cache, we need a *rest.Config. There's two ways to get one: - // - for the local cluster, we can just get the already prepared one from the Manager; - // - for a remote cluster, we can construct one given a kubeconfig; and the kubeconfig will be stored in a Secret, - // associated with the object representing the cluster. - if !cacheFound { - logger.Info("creating cache for cluster and type") - var cfg *rest.Config - - if clusterObject == nil { - cfg = r.manager.GetConfig() - } else { - var kubeconfig []byte - var err error - switch { - case clusterObject.Spec.CAPIClusterRef != nil: - var capiKey client.ObjectKey - capiKey.Name = clusterObject.Spec.CAPIClusterRef.Name - capiKey.Namespace = clusterObject.GetNamespace() - kubeconfig, err = capicfg.FromSecret(ctx, r.Client, capiKey) - if err != nil { - return nil, false, err - } - case clusterObject.Spec.SecretRef != nil: - var secretKey client.ObjectKey - secretKey.Name = clusterObject.Spec.SecretRef.Name - secretKey.Namespace = clusterObject.GetNamespace() - - var sec corev1.Secret - if err := r.Get(ctx, secretKey, &sec); err != nil { - return nil, false, err - } - var ok bool - kubeconfig, ok = sec.Data["kubeconfig"] - if !ok { - return nil, false, fmt.Errorf("referenced Secret does not have data key %s", "kubeconfig") - } - default: - return nil, false, fmt.Errorf("GitopsCluster object has neither .secretRef nor .capiClusterRef populated, unable to get remote cluster config") - } - cfg, err = clientcmd.RESTConfigFromKubeConfig(kubeconfig) - if err != nil { - return nil, false, err - } - } - - // having done all that, did we really need it? - r.cachesMu.Lock() - if cacheEntry, cacheFound = r.caches[cacheKey]; !cacheFound { - c, err := cache.New(cfg, cache.Options{ - Scheme: r.targetScheme, - }) - if err != nil { - r.cachesMu.Unlock() - return nil, false, err - } - - cancel := r.runner.run(func(ctx context.Context) { - if err := c.Start(ctx); err != nil { - logger.Error(err, "cache exited with error") - } - }) - cacheEntry = cacheAndCancel{ - cache: c, - cancel: cancel, - } - r.caches[cacheKey] = cacheEntry - } - r.cachesMu.Unlock() - // Add it to the queue for GC consideration - r.cachesGC.register(cacheKey) - } - - // Now we have a cache; make sure the object type in question is being watched, so we can query it and get updates. - - // The informer is retrieved whether we created the cache or not, because we want to know if it's synced and thus ready to be queried. - typeCache := cacheEntry.cache - inf, err := typeCache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment). - if err != nil { - return nil, false, err - } - - if !cacheFound { // meaning: we created the cache, this time around, so we'll need to install the event handler. - enqueuePipelinesForTarget := func(obj interface{}) { - eventObj, ok := obj.(client.Object) - if !ok { - logger.Info("value to look up in index was not a client.Object", "object", obj) - return - } - pipelines, err := r.pipelinesForApplication(clusterKey, eventObj) - if err != nil { - logger.Error(err, "failed to look up pipelines in index of applications") - return - } - // TODO is passing pointers here dangerous? (do they get copied, I think they might do). Alternative is to pass the whole list in the channel. - for i := range pipelines { - r.appEvents <- event.GenericEvent{Object: &pipelines[i]} - } - } - - _, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - enqueuePipelinesForTarget(obj) - }, - DeleteFunc: func(obj interface{}) { - enqueuePipelinesForTarget(obj) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - // We're just looking up the name in the index so it'll be the same for the old as for the new object. - // However, this might change elsewhere, so to be defensive, run both. The queue will deduplicate, - // though it means we do a bit more lookup work. - enqueuePipelinesForTarget(oldObj) - enqueuePipelinesForTarget(newObj) - }, - }) - if err != nil { - return nil, false, err - } - } - - return typeCache, inf.HasSynced(), nil -} - // clusterPrefix returns a string naming the cluster containing an app, to prepend to the usual namespace/name format of the app object itself. // So that it can be empty, the separator is include in the return value. func clusterPrefix(ref *v1alpha1.CrossNamespaceClusterReference) string { @@ -620,150 +434,21 @@ func (r *PipelineReconciler) getCluster(ctx context.Context, p v1alpha1.Pipeline return cluster, nil } -// == Garbage collection of caches == - -// cacheIndex names the index for keeping track of which pipelines use which caches. -const cacheIndex = "cache" - -// This gives a string representation of a key into the caches map, so it can be used for indexing. -// It has the signature of `targetIndexerFunc` so it can be used with `indexTargets(...)`. -// isCacheUsed below relies on this implementation using clusterAndGVK.String(), so that -// the index keys it uses match what's actually indexed. -func clusterCacheKey(clusterKey client.ObjectKey, gvk schema.GroupVersionKind, _targetKey client.ObjectKey) string { - return (clusterAndGVK{ - ObjectKey: clusterKey, - GroupVersionKind: gvk, - }).String() -} - -// indexTargetCache is an IndexerFunc that returns a key representing the cache a target will come from. This is coupled to -// the scheme for keeping track of caches, as embodied in the `caches` map in the reconciler the method `watchTargetAndGetReader`; -// changing how those work, for instance caching by {cluster, namespace, type} instead, will likely need a change here. -var indexTargetCache = indexTargets(clusterCacheKey) - -// isCacheUsed looks up the given cache key, and returns true if a pipeline uses that cache, and false otherwise; or, -// an error if the query didn't succeed. -func (r *PipelineReconciler) isCacheUsed(key clusterAndGVK) (bool, error) { - var list v1alpha1.PipelineList - if err := r.List(context.TODO(), &list, client.MatchingFields{ - cacheIndex: key.String(), - }); err != nil { - return false, err - } - return len(list.Items) > 0, nil -} - -func (r *PipelineReconciler) removeCache(key clusterAndGVK) { - r.cachesMu.Lock() - defer r.cachesMu.Unlock() - if entry, ok := r.caches[key]; ok { - entry.cancel() - delete(r.caches, key) - } -} - -type cachesInterface interface { - isCacheUsed(clusterAndGVK) (bool, error) - removeCache(clusterAndGVK) -} - -type gc struct { - caches cachesInterface - queue workqueue.RateLimitingInterface - log logr.Logger -} - -func newGC(caches cachesInterface, logger logr.Logger) *gc { - ratelimiter := workqueue.NewItemExponentialFailureRateLimiter(2*time.Second, 512*time.Second) - queue := workqueue.NewRateLimitingQueueWithConfig(ratelimiter, workqueue.RateLimitingQueueConfig{ - Name: "cache-garbage-collection", - }) - - return &gc{ - caches: caches, - queue: queue, - log: logger, - } -} - -func (gc *gc) register(key clusterAndGVK) { - gc.log.Info("cache key registered for GC", "key", key) - gc.queue.Add(key) // NB not rate limited. Though, this is called when the key is introduced, so it shouldn't matter one way or the other. -} - -func (gc *gc) loop() { - for { - item, shutdown := gc.queue.Get() - if shutdown { - return - } - key, ok := item.(clusterAndGVK) - if !ok { - gc.queue.Forget(item) - gc.queue.Done(item) - } - - if ok, err := gc.caches.isCacheUsed(key); err != nil { - gc.log.Error(err, "calling isCacheUsed", "key", key) - } else if ok { - // still used, requeue for consideration - gc.queue.Done(key) - gc.queue.AddRateLimited(key) - } else { - gc.log.Info("removing unused cache", "key", key, "requeues", gc.queue.NumRequeues(key)) - gc.queue.Forget(key) - gc.queue.Done(key) - gc.caches.removeCache(key) - } - } -} - -func (gc *gc) Start(ctx context.Context) error { - if gc.caches == nil || gc.queue == nil { - return fmt.Errorf("neither of caches nor queue can be nil") - } - // queue.Get blocks until either there's an item, or the queue is shutting down, so we can't put - // it in a select with <-ctx.Done(). Instead, do this bit in a goroutine, and rely on it exiting - // when the queue is shut down. - go gc.loop() - <-ctx.Done() - gc.log.Info("shutting down") - gc.queue.ShutDown() - return nil -} - // == Setup of indices and static watchers == // SetupWithManager sets up the controller with the Manager. func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.manager = mgr - - r.runner = newRunner() - if err := mgr.Add(r.runner); err != nil { - return err - } - r.cachesGC = newGC(r, mgr.GetLogger().WithValues("component", "target-cache-gc")) - if err := mgr.Add(r.cachesGC); err != nil { - return err + // let the `caches` object set up its own indexing etc. + if err := r.caches.setupWithManager(mgr); err != nil { + return nil } const ( gitopsClusterIndexKey string = ".spec.environment.ClusterRef" // this is arbitrary, but let's make it suggest what it's indexing. ) // Index the Pipelines by the GitopsCluster references they (may) point at. - if err := mgr.GetCache().IndexField(context.TODO(), &v1alpha1.Pipeline{}, gitopsClusterIndexKey, - r.indexClusterKind("GitopsCluster")); err != nil { - return fmt.Errorf("failed setting index fields: %w", err) - } - - // Index the Pipelines by the application references they point at. - if err := mgr.GetCache().IndexField(context.TODO(), &v1alpha1.Pipeline{}, applicationKey, indexApplication /* <- both from indexing.go */); err != nil { - return fmt.Errorf("failed setting index fields: %w", err) - } - - // Index the Pipelines by the cache they require for their targets. - if err := mgr.GetCache().IndexField(context.TODO(), &v1alpha1.Pipeline{}, cacheIndex, indexTargetCache); err != nil { + if err := mgr.GetCache().IndexField(context.TODO(), &v1alpha1.Pipeline{}, gitopsClusterIndexKey, r.indexClusterKind("GitopsCluster")); err != nil { return fmt.Errorf("failed setting index fields: %w", err) } diff --git a/controllers/leveltriggered/gc_test.go b/controllers/leveltriggered/gc_test.go index ed81b10..7ee6e8f 100644 --- a/controllers/leveltriggered/gc_test.go +++ b/controllers/leveltriggered/gc_test.go @@ -29,9 +29,15 @@ func TestCacheGC(t *testing.T) { }, } + // This is a minor cheat, since it looks into a private field of the controller. But, it should be + // relatively easy to replace if that detail changed. + getCacheKeys := func() []clusterAndGVK { + return pipelineReconciler.caches.getCacheKeys() + } + // First, let's check the initial state. Other tests may leave detritus, so this just checks we // don't have the fake application that's for this specific test. - g.Expect(pipelineReconciler.getCacheKeys()).ToNot(ContainElements(expectedCacheKey)) + g.Expect(getCacheKeys()).ToNot(ContainElements(expectedCacheKey)) ctx := context.TODO() ns := testingutils.NewNamespace(ctx, g, k8sClient) @@ -69,12 +75,12 @@ func TestCacheGC(t *testing.T) { g.Expect(k8sClient.Create(ctx, pipeline)).To(Succeed()) // Now there's a pipeline that mentions the fake kind -- did it create a cache for it? - g.Eventually(pipelineReconciler.getCacheKeys).Should(ContainElement(expectedCacheKey)) + g.Eventually(getCacheKeys).Should(ContainElement(expectedCacheKey)) g.Expect(k8sClient.Delete(ctx, pipeline)).To(Succeed()) // Now there's no pipeline using the fake kind, so it should get cleaned up. - g.Eventually(pipelineReconciler.getCacheKeys, "5s", "0.5s").ShouldNot(ContainElement(expectedCacheKey)) + g.Eventually(getCacheKeys, "5s", "0.5s").ShouldNot(ContainElement(expectedCacheKey)) // Second scenario: the target exists. This is more or less the happy path: you have a pipeline, it's working, you delete it. @@ -94,19 +100,19 @@ func TestCacheGC(t *testing.T) { g.Expect(k8sClient.Create(ctx, pipeline)).To(Succeed()) // Now there's a pipeline that mentions the fake kind -- did it create a cache for it? - g.Eventually(pipelineReconciler.getCacheKeys).Should(ContainElement(expectedCacheKey)) + g.Eventually(getCacheKeys).Should(ContainElement(expectedCacheKey)) // Delete the pipeline, leave the target there g.Expect(k8sClient.Delete(ctx, pipeline)).To(Succeed()) // Now there's no pipeline using the fake kind, so it should get cleaned up even though the target is still there. - g.Eventually(pipelineReconciler.getCacheKeys, "5s", "0.5s").ShouldNot(ContainElement(expectedCacheKey)) + g.Eventually(getCacheKeys, "5s", "0.5s").ShouldNot(ContainElement(expectedCacheKey)) g.Expect(k8sClient.Delete(ctx, target)).To(Succeed()) // Third scenario: two pipelines refer to the same {cluser, type}. Checks that the cache won't get deleted if // there's still something referring to it. - g.Expect(pipelineReconciler.getCacheKeys()).ToNot(ContainElement(expectedCacheKey)) + g.Expect(getCacheKeys()).ToNot(ContainElement(expectedCacheKey)) target = newFake(rand.String(5)) p1 := newPipeline(rand.String(5), target.GetName()) @@ -115,7 +121,7 @@ func TestCacheGC(t *testing.T) { g.Expect(k8sClient.Create(ctx, p2)).To(Succeed()) // Confirm there's a cache for the target type - g.Eventually(pipelineReconciler.getCacheKeys).Should(ContainElement(expectedCacheKey)) + g.Eventually(getCacheKeys).Should(ContainElement(expectedCacheKey)) // Delete one pipeline, there should still be a cache. g.Expect(k8sClient.Delete(ctx, p1)).To(Succeed()) @@ -126,6 +132,6 @@ func TestCacheGC(t *testing.T) { err := pipelineReconciler.Get(ctx, client.ObjectKeyFromObject(p1), &p) return err != nil && client.IgnoreNotFound(err) == nil }).Should(BeTrue()) - // This is a minor cheat, using an internal API, because I can't wait around for something to _not_ happen. - g.Expect(pipelineReconciler.isCacheUsed(expectedCacheKey)).To(BeTrue()) + // This is a minor cheat on top of a cheat, using an internal API, because I can't wait around for something to _not_ happen. + g.Expect(pipelineReconciler.caches.isCacheUsed(expectedCacheKey)).To(BeTrue()) } diff --git a/controllers/leveltriggered/indexing.go b/controllers/leveltriggered/indexing.go index ee69b1b..062c799 100644 --- a/controllers/leveltriggered/indexing.go +++ b/controllers/leveltriggered/indexing.go @@ -7,7 +7,6 @@ import ( "github.com/weaveworks/pipeline-controller/api/v1alpha1" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -114,35 +113,3 @@ func indexTargets(fn targetKeyFunc) func(client.Object) []string { return res } } - -func targetIndexKey(clusterKey client.ObjectKey, gvk schema.GroupVersionKind, targetKey client.ObjectKey) string { - key := fmt.Sprintf("%s:%s:%s", clusterKey, gvk, targetKey) - return key -} - -// indexApplication extracts all the application refs from a pipeline. The index keys are -// -// /:///`. -var indexApplication = indexTargets(targetIndexKey) - -// pipelinesForApplication is given an application object and its cluster, and looks up the pipeline(s) that use it as a target. -// It assumes applications are indexed using `indexApplication(...)` (or something using the same key format and index). -// `clusterName` can be a zero value, but if it's not, both the namespace and name should be supplied (since the namespace will -// be give a value if there's only a name supplied, when indexing. See `indexApplication()`). -func (r *PipelineReconciler) pipelinesForApplication(clusterName client.ObjectKey, obj client.Object) ([]v1alpha1.Pipeline, error) { - ctx := context.Background() - var list v1alpha1.PipelineList - gvk, err := apiutil.GVKForObject(obj, r.Scheme) - if err != nil { - return nil, err - } - - key := targetIndexKey(clusterName, gvk, client.ObjectKeyFromObject(obj)) - if err := r.List(ctx, &list, client.MatchingFields{ - applicationKey: key, - }); err != nil { - return nil, err - } - - return list.Items, nil -}