Skip to content

Commit

Permalink
Create a cache per {cluster, type}
Browse files Browse the repository at this point in the history
This simplifies the picture, since each cache will have a single event
handler, so I don't have to keep track of the event handlers. If I
want to stop watching a type in a cluster, I can stop the cache.

Caches are what can be started and stopped, so it's best to garbage
collect a cache at a time. A cache uses resources -- an HTTP client,
goroutines -- so there is a balance between having fine control over
how much is watched, and how many caches need taking care of.

I could go further and create a cache per {cluster, type,
namespace}. This would mean I don't need to watch any namespaces that
don't have pipeline targets in them -- so a narrower set of
permissions is needed, and the event handler doesn't see as many
spurious events. However, it would mean `O(clusters x types x
namespaces)` resources. I don't think it would be difficult to adapt
the code here to do it, if experience shows it would be better.
  • Loading branch information
squaremo committed Oct 18, 2023
1 parent e0d3e22 commit 13b12bd
Showing 1 changed file with 43 additions and 45 deletions.
88 changes: 43 additions & 45 deletions controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,25 @@ import (
"github.com/weaveworks/pipeline-controller/pkg/conditions"
)

// clusterAndGVK is used as the key for caches
type clusterAndGVK struct {
client.ObjectKey
schema.GroupVersionKind
}

// PipelineReconciler reconciles a Pipeline object
type PipelineReconciler struct {
client.Client
Scheme *runtime.Scheme
targetScheme *runtime.Scheme
ControllerName string
recorder record.EventRecorder
clusters map[client.ObjectKey]*clusterWatcher
clustersMu *sync.Mutex
caches map[clusterAndGVK]cache.Cache
cachesMu *sync.Mutex
manager ctrl.Manager
appEvents chan event.GenericEvent
}

// clusterWatcher keeps track of a cluster cache and client (as embodied by cache.Cache), and which types have had eventhandlers installed.
type clusterWatcher struct {
cache cache.Cache
handlers map[schema.GroupVersionKind]toolscache.ResourceEventHandlerRegistration
mu *sync.Mutex
}

func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder record.EventRecorder, controllerName string) *PipelineReconciler {
targetScheme := runtime.NewScheme()

Expand All @@ -61,8 +60,8 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder rec
targetScheme: targetScheme,
recorder: eventRecorder,
ControllerName: controllerName,
clusters: map[client.ObjectKey]*clusterWatcher{},
clustersMu: &sync.Mutex{},
caches: make(map[clusterAndGVK]cache.Cache),
cachesMu: &sync.Mutex{},
appEvents: make(chan event.GenericEvent),
}
}
Expand Down Expand Up @@ -220,9 +219,10 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

// watchTargetAndGetReader ensures that the target in the cluster given is being watched, and returns a client.Reader.
// watchTargetAndGetReader ensures that the type (GroupVersionKind) of the target in the cluster given is being watched, and
// returns a client.Reader.
// A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise
// a client.Reader and a bool which is true if the object was already watched and syncing.
// a client.Reader and a bool which is true if the type was already watched and syncing.
//
// NB: the target object should be the object that is used as the `client.Object` argument for client.Get(...); the cache has different
// stores for {typed values, partial values, unstructured values}, and we want to install the watcher in the same one as we will later query.
Expand All @@ -231,17 +231,27 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
if clusterObject != nil {
clusterKey = client.ObjectKeyFromObject(clusterObject)
}
logger := log.FromContext(ctx).WithValues("target cluster", clusterKey, "target name", client.ObjectKeyFromObject(target))

r.clustersMu.Lock()
clus, clusterFound := r.clusters[clusterKey]
r.clustersMu.Unlock()
// To construct a client, ultimately we need a *rest.Config. There's two ways to get one:
targetGVK, err := apiutil.GVKForObject(target, r.targetScheme)
if err != nil {
return nil, false, err
}
cacheKey := clusterAndGVK{
ObjectKey: clusterKey,
GroupVersionKind: targetGVK,
}

logger := log.FromContext(ctx).WithValues("component", "target cache", "cluster", clusterKey, "name", client.ObjectKeyFromObject(target), "type", targetGVK)

r.cachesMu.Lock()
typeCache, cacheFound := r.caches[cacheKey]
r.cachesMu.Unlock()
// To construct a cache, we need a *rest.Config. There's two ways to get one:
// - for the local cluster, we can just get the already prepared one from the Manager;
// - for a remote cluster, we can construct one given a kubeconfig; and the kubeconfig will be stored in a Secret,
// associated with the object representing the cluster.
if !clusterFound {
logger.Info("creating client for cluster", "cluster", clusterKey)
if !cacheFound {
logger.Info("creating cache for cluster and type", "cluster", clusterKey, "type", targetGVK)
var cfg *rest.Config

if clusterObject == nil {
Expand Down Expand Up @@ -282,48 +292,37 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
}

// having done all that, did we really need it?
r.clustersMu.Lock()
if _, ok := r.clusters[clusterKey]; !ok {
r.cachesMu.Lock()
if typeCache, cacheFound = r.caches[cacheKey]; !cacheFound {
c, err := cache.New(cfg, cache.Options{
Scheme: r.targetScheme,
})
if err != nil {
r.clustersMu.Unlock()
r.cachesMu.Unlock()
return nil, false, err
}

// this must be done with the lock held, because if it fails, we can't use the cache and shouldn't add it to the map.
if err := r.manager.Add(c); err != nil { // this will start it asynchronously
r.clustersMu.Unlock()
r.cachesMu.Unlock()
return nil, false, err
}

clus = &clusterWatcher{
cache: c,
handlers: map[schema.GroupVersionKind]toolscache.ResourceEventHandlerRegistration{},
mu: &sync.Mutex{},
}
r.clusters[clusterKey] = clus
typeCache = c
r.caches[cacheKey] = typeCache
}
r.clustersMu.Unlock()
r.cachesMu.Unlock()
}

// Now we have a clusterWatcher record; make sure this kind of object is watched

// The informer is retrieved every time, because we want to know if it's synced and thus ready to be queried.
inf, err := clus.cache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment).
if err != nil {
return nil, false, err
}
// Now we have a cache; make sure the object type in question is being watched, so we can query it and get updates.

targetGVK, err := apiutil.GVKForObject(target, r.targetScheme)
// The informer is retrieved whether we created the cache or not, because we want to know if it's synced and thus ready to be queried.
inf, err := typeCache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment).
if err != nil {
return nil, false, err
}

clus.mu.Lock()
defer clus.mu.Unlock()
if _, handlerFound := clus.handlers[targetGVK]; !handlerFound {
if !cacheFound { // meaning: we created the cache, this time around
maybeEnqueuePipeline := func(obj interface{}) {
eventObj, ok := obj.(client.Object)
if !ok {
Expand All @@ -341,7 +340,7 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
}
}

reg, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
_, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
maybeEnqueuePipeline(obj)
},
Expand All @@ -357,10 +356,9 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
if err != nil {
return nil, false, err
}
clus.handlers[targetGVK] = reg
}

return clus.cache, inf.HasSynced(), nil
return typeCache, inf.HasSynced(), nil
}

// targetObject returns a target object for a target spec, ready to be queried. The Pipeline is passed in as well as the Target, since the definition can be spread between these specs.
Expand Down

0 comments on commit 13b12bd

Please sign in to comment.