diff --git a/controllers/leveltriggered/controller.go b/controllers/leveltriggered/controller.go index 810f019..950871e 100644 --- a/controllers/leveltriggered/controller.go +++ b/controllers/leveltriggered/controller.go @@ -32,6 +32,12 @@ import ( "github.com/weaveworks/pipeline-controller/pkg/conditions" ) +// clusterAndGVK is used as the key for caches +type clusterAndGVK struct { + client.ObjectKey + schema.GroupVersionKind +} + // PipelineReconciler reconciles a Pipeline object type PipelineReconciler struct { client.Client @@ -39,19 +45,12 @@ type PipelineReconciler struct { targetScheme *runtime.Scheme ControllerName string recorder record.EventRecorder - clusters map[client.ObjectKey]*clusterWatcher - clustersMu *sync.Mutex + caches map[clusterAndGVK]cache.Cache + cachesMu *sync.Mutex manager ctrl.Manager appEvents chan event.GenericEvent } -// clusterWatcher keeps track of a cluster cache and client (as embodied by cache.Cache), and which types have had eventhandlers installed. -type clusterWatcher struct { - cache cache.Cache - handlers map[schema.GroupVersionKind]toolscache.ResourceEventHandlerRegistration - mu *sync.Mutex -} - func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder record.EventRecorder, controllerName string) *PipelineReconciler { targetScheme := runtime.NewScheme() @@ -61,8 +60,8 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder rec targetScheme: targetScheme, recorder: eventRecorder, ControllerName: controllerName, - clusters: map[client.ObjectKey]*clusterWatcher{}, - clustersMu: &sync.Mutex{}, + caches: make(map[clusterAndGVK]cache.Cache), + cachesMu: &sync.Mutex{}, appEvents: make(chan event.GenericEvent), } } @@ -220,9 +219,10 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } -// watchTargetAndGetReader ensures that the target in the cluster given is being watched, and returns a client.Reader. +// watchTargetAndGetReader ensures that the type (GroupVersionKind) of the target in the cluster given is being watched, and +// returns a client.Reader. // A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise -// a client.Reader and a bool which is true if the object was already watched and syncing. +// a client.Reader and a bool which is true if the type was already watched and syncing. // // NB: the target object should be the object that is used as the `client.Object` argument for client.Get(...); the cache has different // stores for {typed values, partial values, unstructured values}, and we want to install the watcher in the same one as we will later query. @@ -231,17 +231,27 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste if clusterObject != nil { clusterKey = client.ObjectKeyFromObject(clusterObject) } - logger := log.FromContext(ctx).WithValues("target cluster", clusterKey, "target name", client.ObjectKeyFromObject(target)) - r.clustersMu.Lock() - clus, clusterFound := r.clusters[clusterKey] - r.clustersMu.Unlock() - // To construct a client, ultimately we need a *rest.Config. There's two ways to get one: + targetGVK, err := apiutil.GVKForObject(target, r.targetScheme) + if err != nil { + return nil, false, err + } + cacheKey := clusterAndGVK{ + ObjectKey: clusterKey, + GroupVersionKind: targetGVK, + } + + logger := log.FromContext(ctx).WithValues("component", "target cache", "cluster", clusterKey, "name", client.ObjectKeyFromObject(target), "type", targetGVK) + + r.cachesMu.Lock() + typeCache, cacheFound := r.caches[cacheKey] + r.cachesMu.Unlock() + // To construct a cache, we need a *rest.Config. There's two ways to get one: // - for the local cluster, we can just get the already prepared one from the Manager; // - for a remote cluster, we can construct one given a kubeconfig; and the kubeconfig will be stored in a Secret, // associated with the object representing the cluster. - if !clusterFound { - logger.Info("creating client for cluster", "cluster", clusterKey) + if !cacheFound { + logger.Info("creating cache for cluster and type", "cluster", clusterKey, "type", targetGVK) var cfg *rest.Config if clusterObject == nil { @@ -282,48 +292,37 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste } // having done all that, did we really need it? - r.clustersMu.Lock() - if _, ok := r.clusters[clusterKey]; !ok { + r.cachesMu.Lock() + if typeCache, cacheFound = r.caches[cacheKey]; !cacheFound { c, err := cache.New(cfg, cache.Options{ Scheme: r.targetScheme, }) if err != nil { - r.clustersMu.Unlock() + r.cachesMu.Unlock() return nil, false, err } // this must be done with the lock held, because if it fails, we can't use the cache and shouldn't add it to the map. if err := r.manager.Add(c); err != nil { // this will start it asynchronously - r.clustersMu.Unlock() + r.cachesMu.Unlock() return nil, false, err } - clus = &clusterWatcher{ - cache: c, - handlers: map[schema.GroupVersionKind]toolscache.ResourceEventHandlerRegistration{}, - mu: &sync.Mutex{}, - } - r.clusters[clusterKey] = clus + typeCache = c + r.caches[cacheKey] = typeCache } - r.clustersMu.Unlock() + r.cachesMu.Unlock() } - // Now we have a clusterWatcher record; make sure this kind of object is watched - - // The informer is retrieved every time, because we want to know if it's synced and thus ready to be queried. - inf, err := clus.cache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment). - if err != nil { - return nil, false, err - } + // Now we have a cache; make sure the object type in question is being watched, so we can query it and get updates. - targetGVK, err := apiutil.GVKForObject(target, r.targetScheme) + // The informer is retrieved whether we created the cache or not, because we want to know if it's synced and thus ready to be queried. + inf, err := typeCache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment). if err != nil { return nil, false, err } - clus.mu.Lock() - defer clus.mu.Unlock() - if _, handlerFound := clus.handlers[targetGVK]; !handlerFound { + if !cacheFound { // meaning: we created the cache, this time around maybeEnqueuePipeline := func(obj interface{}) { eventObj, ok := obj.(client.Object) if !ok { @@ -341,7 +340,7 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste } } - reg, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ + _, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { maybeEnqueuePipeline(obj) }, @@ -357,10 +356,9 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste if err != nil { return nil, false, err } - clus.handlers[targetGVK] = reg } - return clus.cache, inf.HasSynced(), nil + return typeCache, inf.HasSynced(), nil } // targetObject returns a target object for a target spec, ready to be queried. The Pipeline is passed in as well as the Target, since the definition can be spread between these specs.