Skip to content

Commit

Permalink
Support remote clusters and arbitrary types
Browse files Browse the repository at this point in the history
This commit adds the infrastructure for watching and querying
arbitrarily-typed pipeline targets, in remote clusters as well as the
local cluster.

The basic shape is this: for each target that needs to be examined,
the reconciler uses `watchTargetAndGetReader(..., target)`. This
procedure encapsulates the detail of making sure there's a cache for
the target's cluster and type, and supplies the client.Reader needed
for fetching the target object.

A `cache.Cache` is kept for each {cluster, type}. `cache.Cache` is the
smallest piece of machinery that can be torn down, because the next
layer down, `Informer` objects, can't be removed once created. This is
important for being able to stop watching targets when they are no
longer targets.

Target object updates will come from all the caches, which come and
(in principle) go; but, the handler must be statically installed in
SetupWithManager(). So, targets are looked up in an index to get the
corresponding pipeline (if there is one), and that pipeline is put
into a `source.Channel`. The channel source multiplexes the dynamic
event handlers into a static pipeline requeue handler.

NB:

 * I've put the remote cluster test in its own Test* wrapper, because
   it needs to start another testenv to be the remote cluster.

 * Supporting arbitrary types means using `unstructured.Unstructured`
   when querying for target objects, and this complicates checking
   their status. Since the caches are per-type, in theory there could
   be code for uerying known types (HelmRelease and Kustomize), with
   `Unstructured` as a fallback. So long at the object passed to
   `watchTargetAndGetReader(...) is the same one used with
   client.Get(...), it should all work.

 * A cache per {cluster, type} is not the only possible scheme. The
   watching could be more precise -- meaning fewer spurious events,
   and narrower permissions needed -- by having a cache per {cluster,
   namespace, type}, with the trade-off being managing more
   goroutines, and other overheads. I've chosen the chunkier scheme
   based on an informed guess that it'll be more efficient for low
   numbers of clusters and targets.
  • Loading branch information
squaremo committed Oct 18, 2023
1 parent 8360d98 commit 5a5c11a
Show file tree
Hide file tree
Showing 10 changed files with 2,320 additions and 120 deletions.
1,629 changes: 1,629 additions & 0 deletions config/testdata/crds/kustomization.yaml

Large diffs are not rendered by default.

260 changes: 231 additions & 29 deletions controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,52 @@ package leveltriggered
import (
"context"
"fmt"
"sync"

helmv2 "github.com/fluxcd/helm-controller/api/v2beta1"
clusterctrlv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
capicfg "sigs.k8s.io/cluster-api/util/kubeconfig"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/weaveworks/pipeline-controller/api/v1alpha1"
"github.com/weaveworks/pipeline-controller/pkg/conditions"
)

// clusterAndGVK is used as the key for caches
type clusterAndGVK struct {
client.ObjectKey
schema.GroupVersionKind
}

// PipelineReconciler reconciles a Pipeline object
type PipelineReconciler struct {
client.Client
Scheme *runtime.Scheme
targetScheme *runtime.Scheme
ControllerName string
recorder record.EventRecorder
caches map[clusterAndGVK]cache.Cache
cachesMu *sync.Mutex
manager ctrl.Manager
appEvents chan event.GenericEvent
}

func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder record.EventRecorder, controllerName string) *PipelineReconciler {
Expand All @@ -40,6 +60,9 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder rec
targetScheme: targetScheme,
recorder: eventRecorder,
ControllerName: controllerName,
caches: make(map[clusterAndGVK]cache.Cache),
cachesMu: &sync.Mutex{},
appEvents: make(chan event.GenericEvent),
}
}

Expand Down Expand Up @@ -117,22 +140,33 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
}

// it's OK if this is `nil` -- that represents the local cluster.
clusterClient, err := r.getClusterClient(clusterObject)
targetObj, err := targetObject(&pipeline, &target)
if err != nil {
targetStatus.Error = fmt.Sprintf("target spec could not be interpreted as an object: %s", err.Error())
unready = true
continue
}

// it's OK if clusterObject is still `nil` -- that represents the local cluster.
clusterClient, ok, err := r.watchTargetAndGetReader(ctx, clusterObject, targetObj)
if err != nil {
return ctrl.Result{}, err
}
if !ok {
targetStatus.Error = "Target cluster client is not synced"
unready = true
continue
}

targetKey := client.ObjectKeyFromObject(targetObj)
// look up the actual application
var app helmv2.HelmRelease // FIXME this can be other kinds!
appKey := targetObjectKey(&pipeline, &target)
err = clusterClient.Get(ctx, appKey, &app)
err = clusterClient.Get(ctx, targetKey, targetObj)
if err != nil {
r.emitEventf(
&pipeline,
corev1.EventTypeWarning,
"GetAppError", "Failed to get application object %s%s/%s for pipeline %s/%s: %s",
clusterPrefix(target.ClusterRef), appKey.Namespace, appKey.Name,
clusterPrefix(target.ClusterRef), targetKey.Namespace, targetKey.Name,
pipeline.GetNamespace(), pipeline.GetName(),
err,
)
Expand All @@ -141,7 +175,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
unready = true
continue
}
setTargetStatus(targetStatus, &app)
setTargetStatus(targetStatus, targetObj)
}
}

Expand Down Expand Up @@ -185,35 +219,204 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

// getClusterClient retrieves or creates a client for the cluster in question. A `nil` value for the argument indicates the local cluster.
func (r *PipelineReconciler) getClusterClient(cluster *clusterctrlv1alpha1.GitopsCluster) (client.Client, error) {
if cluster == nil {
return r.Client, nil
// watchTargetAndGetReader ensures that the type (GroupVersionKind) of the target in the cluster given is being watched, and
// returns a client.Reader.
// A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise
// a client.Reader and a bool which is true if the type was already watched and syncing.
//
// NB: the target object should be the object that is used as the `client.Object` argument for client.Get(...); the cache has different
// stores for {typed values, partial values, unstructured values}, and we want to install the watcher in the same one as we will later query.
func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, clusterObject *clusterctrlv1alpha1.GitopsCluster, target client.Object) (client.Reader, bool, error) {
var clusterKey client.ObjectKey
if clusterObject != nil {
clusterKey = client.ObjectKeyFromObject(clusterObject)
}

targetGVK, err := apiutil.GVKForObject(target, r.targetScheme)
if err != nil {
return nil, false, err
}
cacheKey := clusterAndGVK{
ObjectKey: clusterKey,
GroupVersionKind: targetGVK,
}

logger := log.FromContext(ctx).WithValues("component", "target cache", "cluster", clusterKey, "name", client.ObjectKeyFromObject(target), "type", targetGVK)

r.cachesMu.Lock()
typeCache, cacheFound := r.caches[cacheKey]
r.cachesMu.Unlock()
// To construct a cache, we need a *rest.Config. There's two ways to get one:
// - for the local cluster, we can just get the already prepared one from the Manager;
// - for a remote cluster, we can construct one given a kubeconfig; and the kubeconfig will be stored in a Secret,
// associated with the object representing the cluster.
if !cacheFound {
logger.Info("creating cache for cluster and type", "cluster", clusterKey, "type", targetGVK)
var cfg *rest.Config

if clusterObject == nil {
cfg = r.manager.GetConfig()
} else {
var kubeconfig []byte
var err error
switch {
case clusterObject.Spec.CAPIClusterRef != nil:
var capiKey client.ObjectKey
capiKey.Name = clusterObject.Spec.CAPIClusterRef.Name
capiKey.Namespace = clusterObject.GetNamespace()
kubeconfig, err = capicfg.FromSecret(ctx, r.Client, capiKey)
if err != nil {
return nil, false, err
}
case clusterObject.Spec.SecretRef != nil:
var secretKey client.ObjectKey
secretKey.Name = clusterObject.Spec.SecretRef.Name
secretKey.Namespace = clusterObject.GetNamespace()

var sec corev1.Secret
if err := r.Get(ctx, secretKey, &sec); err != nil {
return nil, false, err
}
var ok bool
kubeconfig, ok = sec.Data["kubeconfig"]
if !ok {
return nil, false, fmt.Errorf("referenced Secret does not have data key %s", "kubeconfig")
}
default:
return nil, false, fmt.Errorf("GitopsCluster object has neither .secretRef nor .capiClusterRef populated, unable to get remote cluster config")
}
cfg, err = clientcmd.RESTConfigFromKubeConfig(kubeconfig)
if err != nil {
return nil, false, err
}
}

// having done all that, did we really need it?
r.cachesMu.Lock()
if typeCache, cacheFound = r.caches[cacheKey]; !cacheFound {
c, err := cache.New(cfg, cache.Options{
Scheme: r.targetScheme,
})
if err != nil {
r.cachesMu.Unlock()
return nil, false, err
}

// this must be done with the lock held, because if it fails, we can't use the cache and shouldn't add it to the map.
if err := r.manager.Add(c); err != nil { // this will start it asynchronously
r.cachesMu.Unlock()
return nil, false, err
}

typeCache = c
r.caches[cacheKey] = typeCache
}
r.cachesMu.Unlock()
}

// Now we have a cache; make sure the object type in question is being watched, so we can query it and get updates.

// The informer is retrieved whether we created the cache or not, because we want to know if it's synced and thus ready to be queried.
inf, err := typeCache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment).
if err != nil {
return nil, false, err
}
// TODO future: get the secret via the cluster object, connect to remote cluster
return nil, fmt.Errorf("remote clusters not supported yet")
}

// targetObjectKey returns the object key (namespaced name) for a target. The Pipeline is passed in as well as the Target, since the definition can be spread between these specs.
func targetObjectKey(pipeline *v1alpha1.Pipeline, target *v1alpha1.Target) client.ObjectKey {
key := client.ObjectKey{}
key.Name = pipeline.Spec.AppRef.Name
key.Namespace = target.Namespace
return key
if !cacheFound { // meaning: we created the cache, this time around
maybeEnqueuePipeline := func(obj interface{}) {
eventObj, ok := obj.(client.Object)
if !ok {
logger.Info("value to look up in index was not a client.Object", "object", eventObj)
return
}
pipelines, err := r.pipelinesForApplication(clusterKey, eventObj)
if err != nil {
logger.Error(err, "failed to look up pipelines in index of applications")
return
}
// TODO is passing pointers here dangerous? (do they get copied, I think they might do). Alternative is to pass the whole list in the channel.
for i := range pipelines {
r.appEvents <- event.GenericEvent{Object: &pipelines[i]}
}
}

_, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
maybeEnqueuePipeline(obj)
},
DeleteFunc: func(obj interface{}) {
maybeEnqueuePipeline(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// We're just looking up the name in the index it'll be the same for the old as for the new object; however, this might change, so to be defensive, run both. The queue will deduplicate, though we do a bit more lookup work.
maybeEnqueuePipeline(oldObj)
maybeEnqueuePipeline(newObj)
},
})
if err != nil {
return nil, false, err
}
}

return typeCache, inf.HasSynced(), nil
}

// clusterPrefix returns a string naming the cluster containing an app, to prepend to the usual namespace/name format of the app object itself. So that it can be empty, the separator is include in the return value.
// clusterPrefix returns a string naming the cluster containing an app, to prepend to the usual namespace/name format of the app object itself.
// So that it can be empty, the separator is include in the return value.
func clusterPrefix(ref *v1alpha1.CrossNamespaceClusterReference) string {
if ref == nil {
return ""
}
return fmt.Sprintf("%s/%s:", ref.Namespace, ref.Name)
}

// targetObject returns a target object for a target spec, ready to be queried. The Pipeline is passed in as well as the Target,
// since the definition can be spread between these specs. This is coupled with setTargetStatus because the concrete types returned here
// must be handled by setTargetStatus.
func targetObject(pipeline *v1alpha1.Pipeline, target *v1alpha1.Target) (client.Object, error) {
var obj unstructured.Unstructured
gv, err := schema.ParseGroupVersion(pipeline.Spec.AppRef.APIVersion)
if err != nil {
return nil, err
}
gvk := gv.WithKind(pipeline.Spec.AppRef.Kind)
obj.GetObjectKind().SetGroupVersionKind(gvk)
obj.SetName(pipeline.Spec.AppRef.Name)
obj.SetNamespace(target.Namespace)
return &obj, nil
}

// setTargetStatus gets the relevant status from the app object given, and records it in the TargetStatus.
func setTargetStatus(status *v1alpha1.TargetStatus, target *helmv2.HelmRelease) {
status.Revision = target.Status.LastAppliedRevision
status.Ready = conditions.IsReady(target.Status.Conditions)
func setTargetStatus(status *v1alpha1.TargetStatus, targetObject client.Object) {
switch obj := targetObject.(type) {
case *unstructured.Unstructured:
// this assumes it's a Flux-like object; specifically with
// - a Ready condition
// - a .status.lastAppliedRevision
conds, ok, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
if !ok || err != nil {
status.Ready = false
return
}
status.Ready = conditions.IsReadyUnstructured(conds)

lastAppliedRev, ok, err := unstructured.NestedString(obj.Object, "status", "lastAppliedRevision")
if !ok || err != nil {
// It's not an error to lack a Ready condition (new objects will lack any conditions), and it's not an error to lack a lastAppliedRevision
// (maybe it hasn't got that far yet); but it is an error to have a ready condition of true and lack a lastAppliedRevision, since that means
// the object is not a usable target.
if status.Ready {
status.Error = "unable to find .status.lastAppliedRevision in ready target object"
status.Ready = false
return
}
} else {
status.Revision = lastAppliedRev
}
default:
status.Error = "unable to determine ready status for object"
status.Ready = false
}
}

func (r *PipelineReconciler) patchStatus(ctx context.Context, n types.NamespacedName, newStatus v1alpha1.PipelineStatus) error {
Expand Down Expand Up @@ -241,6 +444,8 @@ func (r *PipelineReconciler) getCluster(ctx context.Context, p v1alpha1.Pipeline

// SetupWithManager sets up the controller with the Manager.
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.manager = mgr

const (
gitopsClusterIndexKey string = ".spec.environment.ClusterRef" // this is arbitrary, but let's make it suggest what it's indexing.
)
Expand All @@ -265,10 +470,7 @@ func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
&clusterctrlv1alpha1.GitopsCluster{},
handler.EnqueueRequestsFromMapFunc(r.requestsForCluster(gitopsClusterIndexKey)),
).
Watches(
&helmv2.HelmRelease{},
handler.EnqueueRequestsFromMapFunc(r.requestsForApplication),
).
WatchesRawSource(&source.Channel{Source: r.appEvents}, &handler.EnqueueRequestForObject{}).
Complete(r)
}

Expand Down
Loading

0 comments on commit 5a5c11a

Please sign in to comment.