Skip to content

Commit

Permalink
Garbage collect caches that aren't needed any more
Browse files Browse the repository at this point in the history
It's possible that due to pipelines disappearing, or being updated,
some caches will no longer be needed. If these are shut down, the
number of caches will only grow, which constitutes a leak of resources
(though not necessarily a serious one, since it will max out at
`clusters x types`).

To be able to shut down caches that are no longer needed, we need to
be able to do a few things:

 1. detect when they aren't needed
 2. stop them running by cancelling the context they were started with
 3. have them stop when the controller is shutting down

To do the first, I index the cache keys used by each pipeline. When a
cache's event handler is run, it checks to see if the cache has
entries in the index; and if not, it's not used by any pipeline and
can be shut down.

The second and third things are slightly tricky to arrange together. I
have introduced `runner` (in runner.go) which can be Start()ed by the
manager and thus gain access to its context, and which can then
construct a context for each cache, derived from the manager's
context. Each cache gets its own cancel func that can be used to shut
it down, but will also be shut down by the manager when it's shutting
down itself.

Signed-off-by: Michael Bridgen <[email protected]>
  • Loading branch information
squaremo committed Oct 18, 2023
1 parent 4628a09 commit a4b3af3
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 28 deletions.
103 changes: 90 additions & 13 deletions controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,25 @@ type clusterAndGVK struct {
schema.GroupVersionKind
}

func (key clusterAndGVK) String() string {
return key.ObjectKey.String() + ":" + key.GroupVersionKind.String()
}

type cacheAndCancel struct {
cache cache.Cache
cancel context.CancelFunc
}

// PipelineReconciler reconciles a Pipeline object
type PipelineReconciler struct {
client.Client
Scheme *runtime.Scheme
targetScheme *runtime.Scheme
ControllerName string
recorder record.EventRecorder
caches map[clusterAndGVK]cache.Cache
caches map[clusterAndGVK]cacheAndCancel
cachesMu *sync.Mutex
runner *runner
manager ctrl.Manager
appEvents chan event.GenericEvent
}
Expand All @@ -60,8 +70,9 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder rec
targetScheme: targetScheme,
recorder: eventRecorder,
ControllerName: controllerName,
caches: make(map[clusterAndGVK]cache.Cache),
caches: make(map[clusterAndGVK]cacheAndCancel),
cachesMu: &sync.Mutex{},
runner: newRunner(),
appEvents: make(chan event.GenericEvent),
}
}
Expand Down Expand Up @@ -244,7 +255,7 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
logger := log.FromContext(ctx).WithValues("component", "target cache", "cluster", clusterKey, "name", client.ObjectKeyFromObject(target), "type", targetGVK)

r.cachesMu.Lock()
typeCache, cacheFound := r.caches[cacheKey]
cacheEntry, cacheFound := r.caches[cacheKey]
r.cachesMu.Unlock()
// To construct a cache, we need a *rest.Config. There's two ways to get one:
// - for the local cluster, we can just get the already prepared one from the Manager;
Expand Down Expand Up @@ -293,7 +304,7 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste

// having done all that, did we really need it?
r.cachesMu.Lock()
if typeCache, cacheFound = r.caches[cacheKey]; !cacheFound {
if cacheEntry, cacheFound = r.caches[cacheKey]; !cacheFound {
c, err := cache.New(cfg, cache.Options{
Scheme: r.targetScheme,
})
Expand All @@ -302,31 +313,35 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
return nil, false, err
}

// this must be done with the lock held, because if it fails, we can't use the cache and shouldn't add it to the map.
if err := r.manager.Add(c); err != nil { // this will start it asynchronously
r.cachesMu.Unlock()
return nil, false, err
cancel := r.runner.run(func(ctx context.Context) {
if err := c.Start(ctx); err != nil {
logger.Error(err, "cache exited with error")
}
})
cacheEntry = cacheAndCancel{
cache: c,
cancel: cancel,
}

typeCache = c
r.caches[cacheKey] = typeCache
r.caches[cacheKey] = cacheEntry
}
r.cachesMu.Unlock()

}

// Now we have a cache; make sure the object type in question is being watched, so we can query it and get updates.

// The informer is retrieved whether we created the cache or not, because we want to know if it's synced and thus ready to be queried.
typeCache := cacheEntry.cache
inf, err := typeCache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment).
if err != nil {
return nil, false, err
}

if !cacheFound { // meaning: we created the cache, this time around
if !cacheFound { // meaning: we created the cache, this time around, so we'll need to install the event handler.
enqueuePipelinesForTarget := func(obj interface{}) {
eventObj, ok := obj.(client.Object)
if !ok {
logger.Info("value to look up in index was not a client.Object", "object", eventObj)
logger.Info("value to look up in index was not a client.Object", "object", obj)
return
}
pipelines, err := r.pipelinesForApplication(clusterKey, eventObj)
Expand All @@ -338,6 +353,17 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
for i := range pipelines {
r.appEvents <- event.GenericEvent{Object: &pipelines[i]}
}

// this sneaks in an opportunity for garbage collecting caches we don't need. If we look for pipelines that use this cache,
// and there aren't any, we can dismantle the cache.
ok, err = r.isCacheUsed(cacheKey)
if err != nil {
logger.Error(err, "failed to look up pipelines in index of caches")
return
}
if !ok {
r.removeCache(cacheKey)
}
}

_, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -444,9 +470,55 @@ func (r *PipelineReconciler) getCluster(ctx context.Context, p v1alpha1.Pipeline
return cluster, nil
}

// == Garbage collection of caches ==

// cacheIndex names the index for keeping track of which pipelines use which caches.
const cacheIndex = "cache"

// This gives a string representation of a key into the caches map, so it can be used for indexing.
// It has the signature of `targetIndexerFunc` so it can be used with `indexTargets(...)`.
// isCacheUsed below relies on this implementation using clusterAndGVK.String(), so that
// the index keys it uses match what's actually indexed.
func clusterCacheKey(clusterKey client.ObjectKey, gvk schema.GroupVersionKind, _targetKey client.ObjectKey) string {
return (clusterAndGVK{
ObjectKey: clusterKey,
GroupVersionKind: gvk,
}).String()
}

// indexTargetCache is an IndexerFunc that returns a key representing the cache a target will come from. This is coupled to
// the scheme for keeping track of caches, as embodied in the `caches` map in the reconciler the method `watchTargetAndGetReader`;
// changing how those work, for instance caching by {cluster, namespace, type} instead, will likely need a change here.
var indexTargetCache = indexTargets(clusterCacheKey)

// isCacheUsed looks up the given cache key, and returns true if a pipeline uses that cache, and false otherwise; or,
// an error if the query didn't succeed.
func (r *PipelineReconciler) isCacheUsed(key clusterAndGVK) (bool, error) {
var list v1alpha1.PipelineList
if err := r.List(context.TODO(), &list, client.MatchingFields{
cacheIndex: key.String(),
}); err != nil {
return false, err
}

return len(list.Items) > 0, nil
}

func (r *PipelineReconciler) removeCache(key clusterAndGVK) {
r.cachesMu.Lock()
defer r.cachesMu.Unlock()
if entry, ok := r.caches[key]; ok {
entry.cancel()
delete(r.caches, key)
}
}

// == Setup of indices and static watchers ==

// SetupWithManager sets up the controller with the Manager.
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.manager = mgr
mgr.Add(r.runner)

const (
gitopsClusterIndexKey string = ".spec.environment.ClusterRef" // this is arbitrary, but let's make it suggest what it's indexing.
Expand All @@ -462,6 +534,11 @@ func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
return fmt.Errorf("failed setting index fields: %w", err)
}

// Index the Pipelines by the application references they point at.
if err := mgr.GetCache().IndexField(context.TODO(), &v1alpha1.Pipeline{}, cacheIndex, indexTargets(clusterCacheKey)); err != nil {
return fmt.Errorf("failed setting index fields: %w", err)
}

if r.recorder == nil {
r.recorder = mgr.GetEventRecorderFor(r.ControllerName)
}
Expand Down
34 changes: 19 additions & 15 deletions controllers/leveltriggered/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (r *PipelineReconciler) requestsForCluster(indexKey string) func(context.Co
const applicationKey = ".spec.environments[].targets[].appRef"

// targetKeyFunc is a type representing a way to get an index key from a target spec.
type targetKeyFunc func(clusterName, clusterNamespace string, targetKind schema.GroupVersionKind, targetName, targetNamespace string) string
type targetKeyFunc func(cluster client.ObjectKey, typ schema.GroupVersionKind, target client.ObjectKey) string

// indexTargets is given a func which returns a key for a target spec, and returns a `client.IndexerFunc` that will index
// each target from a Pipeline object.
Expand All @@ -92,34 +92,38 @@ func indexTargets(fn targetKeyFunc) func(client.Object) []string {
var res []string
for _, env := range p.Spec.Environments {
for _, target := range env.Targets {
var clusterName, clusterNamespace string
var clusterKey client.ObjectKey
if target.ClusterRef != nil {
clusterName = target.ClusterRef.Name
clusterNamespace = target.ClusterRef.Namespace
if clusterNamespace == "" {
clusterNamespace = p.GetNamespace()
clusterKey.Name = target.ClusterRef.Name
clusterKey.Namespace = target.ClusterRef.Namespace
if clusterKey.Namespace == "" {
clusterKey.Namespace = p.GetNamespace()
}
}

namespace := target.Namespace
if namespace == "" {
namespace = p.GetNamespace()
var targetKey client.ObjectKey
targetKey.Namespace = target.Namespace
targetKey.Name = name
if targetKey.Namespace == "" {
targetKey.Namespace = p.GetNamespace()
}
key := fn(clusterNamespace, clusterName, gvk, namespace, name)
key := fn(clusterKey, gvk, targetKey)
res = append(res, key)
}
}
return res
}
}

func targetIndexKey(clusterKey client.ObjectKey, gvk schema.GroupVersionKind, targetKey client.ObjectKey) string {
key := fmt.Sprintf("%s:%s:%s", clusterKey, gvk, targetKey)
return key
}

// indexApplication extracts all the application refs from a pipeline. The index keys are
//
// <cluster namespace>/<cluster name>:<group>/<kind>/<namespace>/<name>`.
var indexApplication = indexTargets(func(clusterNamespace, clusterName string, gvk schema.GroupVersionKind, targetNamespace, targetName string) string {
key := fmt.Sprintf("%s/%s:%s/%s/%s/%s", clusterNamespace, clusterName, gvk.Group, gvk.Kind, targetNamespace, targetName)
return key
})
var indexApplication = indexTargets(targetIndexKey)

// pipelinesForApplication is given an application object and its cluster, and looks up the pipeline(s) that use it as a target.
// It assumes applications are indexed using `indexApplication(...)` (or something using the same key format and index).
Expand All @@ -133,7 +137,7 @@ func (r *PipelineReconciler) pipelinesForApplication(clusterName client.ObjectKe
return nil, err
}

key := fmt.Sprintf("%s/%s:%s/%s/%s/%s", clusterName.Namespace, clusterName.Name, gvk.Group, gvk.Kind, obj.GetNamespace(), obj.GetName())
key := targetIndexKey(clusterName, gvk, client.ObjectKeyFromObject(obj))
if err := r.List(ctx, &list, client.MatchingFields{
applicationKey: key,
}); err != nil {
Expand Down
60 changes: 60 additions & 0 deletions controllers/leveltriggered/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package leveltriggered

// This is a dead simple way to run things using a manager's context as a base, so that they will
// get shut down when the manager does. It must be constructed with `newRunner`, and added to a manager:
//
// r := newRunner()
// mgr.Add(r)
//
// then you can use it to run funcs:
//
// cancel := r.run(func(context.Context))
//
// The func will be run with its own context derived from the root context supplied by the manager,
// with the cancel func returned to the caller as shown. This way you can cancel the context yourself,
// or let it be canceled when the manager shuts down.
//
// It'll deadlock if you call `run` before adding it to a manager (or otherwise calling `Start`).

type runWithContext struct {
ctx context.Context

Check failure on line 20 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: context

Check failure on line 20 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: context
run func(context.Context)

Check failure on line 21 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: context
}

type runner struct {
rootContext context.Context

Check failure on line 25 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: context
tostart chan runWithContext
ready chan struct{}
}

func newRunner() *cacheRunner {

Check failure on line 30 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: cacheRunner
return &runner{
tostart: make(chan runWithContext),
ready: make(chan struct{}),
}
}

func (r *runner) run(fn func(ctx context.Context)) context.CancelFunc {

Check failure on line 37 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: context
<-r.ready // wait until there's a root context
ctx, cancel := context.WithCancel(r.rootContext)

Check failure on line 39 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: context
r.tostart <- runWithContext{
run: fn,
ctx: ctx,
}
return cancel
}

// Start makes this a manager.Runnable so it can be registered with
// the manager and use its root context.
func (r *runner) Start(ctx context.Context) error {

Check failure on line 49 in controllers/leveltriggered/runner.go

View workflow job for this annotation

GitHub Actions / lint

undefined: context) (typecheck)
r.rootContext = ctx
close(r.ready) // broadcast that things can be run
for {
select {
case randc := <-r.tostart:
go randc.run(randc.ctx)
case <-r.rootContext.Done():
return nil
}
}
}

0 comments on commit a4b3af3

Please sign in to comment.