Skip to content

Commit

Permalink
Support arbitrary kinds as targets
Browse files Browse the repository at this point in the history
This replaces the typed object (`HelmRelease`-specific) code with
using `unstructured.Unstructured`. This has a few repercussions:

 - watches have to be particular to the kind;
 - the target status has to be calculated from an
   unstructured.Unstructured (arbitrary object), rather than specific
   Flux API types.

The first one means I have to introduce some bookkeeping, so that
event handlers are only installed once. I've done this with a map of
GVK->handle per cluster. The handle will be handy (handley?) later,
for unregistering event handlers when no longer needed.

The tests still need to register APIs in the scheme, so they can use
Go types for those APIs; but, the dynamic client used in the machinery
doesn't need it.

Signed-off-by: Michael Bridgen <[email protected]>
  • Loading branch information
squaremo committed Oct 11, 2023
1 parent 200d6b6 commit 58ca80d
Show file tree
Hide file tree
Showing 9 changed files with 1,970 additions and 68 deletions.
1,629 changes: 1,629 additions & 0 deletions config/testdata/crds/kustomization.yaml

Large diffs are not rendered by default.

143 changes: 105 additions & 38 deletions controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"fmt"

helmv2 "github.com/fluxcd/helm-controller/api/v2beta1"
clusterctrlv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
Expand All @@ -19,6 +20,7 @@ import (
capicfg "sigs.k8s.io/cluster-api/util/kubeconfig"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -36,11 +38,17 @@ type PipelineReconciler struct {
targetScheme *runtime.Scheme
ControllerName string
recorder record.EventRecorder
clusters map[client.ObjectKey]cluster.Cluster
clusters map[client.ObjectKey]*clusterWatcher
manager ctrl.Manager
appEvents chan event.GenericEvent
}

// clusterWatcher keeps track of a cluster cache and client (as embodied by a cluster.Cluster), and which types have had eventhandlers installed.
type clusterWatcher struct {
cluster cluster.Cluster
handlers map[schema.GroupVersionKind]toolscache.ResourceEventHandlerRegistration
}

func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder record.EventRecorder, controllerName string) *PipelineReconciler {
targetScheme := runtime.NewScheme()

Expand All @@ -50,7 +58,7 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, eventRecorder rec
targetScheme: targetScheme,
recorder: eventRecorder,
ControllerName: controllerName,
clusters: map[client.ObjectKey]cluster.Cluster{},
clusters: map[client.ObjectKey]*clusterWatcher{},
appEvents: make(chan event.GenericEvent),
}
}
Expand Down Expand Up @@ -129,9 +137,15 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
}

appKey := targetObjectKey(&pipeline, &target)
targetObj, err := targetObject(&pipeline, &target)
if err != nil {
targetStatus.Error = fmt.Sprintf("target spec could not be interpreted as an object: %s", err.Error())
unready = true
continue
}

// it's OK if clusterObject is still `nil` -- that represents the local cluster.
clusterClient, ok, err := r.watchTargetAndGetReader(ctx, clusterObject, appKey)
clusterClient, ok, err := r.watchTargetAndGetReader(ctx, clusterObject, targetObj)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -141,15 +155,15 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
continue
}

targetKey := client.ObjectKeyFromObject(targetObj)
// look up the actual application
var app helmv2.HelmRelease // FIXME this can be other kinds!
err = clusterClient.Get(ctx, appKey, &app)
err = clusterClient.Get(ctx, targetKey, targetObj)
if err != nil {
r.emitEventf(
&pipeline,
corev1.EventTypeWarning,
"GetAppError", "Failed to get application object %s%s/%s for pipeline %s/%s: %s",
clusterPrefix(target.ClusterRef), appKey.Namespace, appKey.Name,
clusterPrefix(target.ClusterRef), targetKey.Namespace, targetKey.Name,
pipeline.GetNamespace(), pipeline.GetName(),
err,
)
Expand All @@ -158,7 +172,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
unready = true
continue
}
setTargetStatus(targetStatus, &app)
setTargetStatus(targetStatus, targetObj)
}
}

Expand Down Expand Up @@ -202,13 +216,18 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

// watchTargetAndGetReader ensures that the target app in the cluster given is being watched, and returns a client.Reader. A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise a client.Reader and a bool which is true if the object was already watched and syncing.
func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, clusterObject *clusterctrlv1alpha1.GitopsCluster, targetKey client.ObjectKey) (client.Reader, bool, error) {
// watchTargetAndGetReader ensures that the target in the cluster given is being watched, and returns a client.Reader.
// A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise
// a client.Reader and a bool which is true if the object was already watched and syncing.
//
// NB: the target object should be the object that is used as the `client.Object` argument for client.Get(...); the cache has different
// stores for {typed values, partial values, unstructured values}, and we want to install the watcher in the same one as we will later query.
func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, clusterObject *clusterctrlv1alpha1.GitopsCluster, target client.Object) (client.Reader, bool, error) {
var clusterKey client.ObjectKey
if clusterObject != nil {
clusterKey = client.ObjectKeyFromObject(clusterObject)
}
logger := log.FromContext(ctx).WithValues("target cluster", clusterKey, "target name", targetKey)
logger := log.FromContext(ctx).WithValues("target cluster", clusterKey, "target name", client.ObjectKeyFromObject(target))

// FIXME: concurrency
clus, clusterFound := r.clusters[clusterKey]
Expand Down Expand Up @@ -256,32 +275,43 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
return nil, false, err
}
}
var err error
clus, err = cluster.New(cfg, func(opts *cluster.Options) {
opts.Scheme = r.Scheme // FIXME: from the name, this ought to be `targetScheme`; but, that is a "plain" scheme with none of the Flux types registered. So for now, we'll use the local scheme.

c, err := cluster.New(cfg, func(opts *cluster.Options) {
opts.Scheme = r.targetScheme
})
if err != nil {
return nil, false, err
}
r.manager.Add(clus) // this will start it asynchronously
r.manager.Add(c) // this will start it asynchronously

Check failure on line 285 in controllers/leveltriggered/controller.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `r.manager.Add` is not checked (errcheck)

clus = &clusterWatcher{
cluster: c,
handlers: map[schema.GroupVersionKind]toolscache.ResourceEventHandlerRegistration{},
}
r.clusters[clusterKey] = clus
}

// NB this assumes that only this type needs a watch; I'll have to do more bookkeeping when I deal with different types.
var targetObject helmv2.HelmRelease
// Now we have a clusterWatcher record; make sure this kind of object is watched

inf, err := clus.GetCache().GetInformer(ctx, &targetObject)
if err != nil {
return nil, false, err
}
// The informer is retrieved every time, because we want to know if it's synced and thus ready to be queried.
inf, err := clus.cluster.GetCache().GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment).
if err != nil {
return nil, false, err
}

// FIXME I don't get to find out if I've already installed a handler; I'll have to do my own bookkeeping here
targetGVK, err := apiutil.GVKForObject(target, r.targetScheme)
if err != nil {
return nil, false, err
}

if _, handlerFound := clus.handlers[targetGVK]; !handlerFound {
maybeEnqueuePipeline := func(obj interface{}) {
targetObj, ok := obj.(client.Object)
eventObj, ok := obj.(client.Object)
if !ok {
logger.Info("value to look up in index was not a client.Object", "object", obj)
logger.Info("value to look up in index was not a client.Object", "object", eventObj)
return
}
pipelines, err := r.pipelinesForApplication(clusterKey, targetObj)
pipelines, err := r.pipelinesForApplication(clusterKey, eventObj)
if err != nil {
logger.Error(err, "failed to look up pipelines in index of applications")
return
Expand All @@ -292,7 +322,7 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
}
}

inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
reg, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
maybeEnqueuePipeline(obj)
},
Expand All @@ -305,18 +335,27 @@ func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, cluste
maybeEnqueuePipeline(newObj)
},
})

if err != nil {
return nil, false, err
}
clus.handlers[targetGVK] = reg
}

return clus.GetClient(), clusterFound, nil
return clus.cluster.GetClient(), inf.HasSynced(), nil
}

// targetObjectKey returns the object key (namespaced name) for a target. The Pipeline is passed in as well as the Target, since the definition can be spread between these specs.
func targetObjectKey(pipeline *v1alpha1.Pipeline, target *v1alpha1.Target) client.ObjectKey {
key := client.ObjectKey{}
key.Name = pipeline.Spec.AppRef.Name
key.Namespace = target.Namespace
return key
// targetObject returns a target object for a target spec, ready to be queried. The Pipeline is passed in as well as the Target, since the definition can be spread between these specs.
func targetObject(pipeline *v1alpha1.Pipeline, target *v1alpha1.Target) (client.Object, error) {
var obj unstructured.Unstructured
gv, err := schema.ParseGroupVersion(pipeline.Spec.AppRef.APIVersion)
if err != nil {
return nil, err
}
gvk := gv.WithKind(pipeline.Spec.AppRef.Kind)
obj.GetObjectKind().SetGroupVersionKind(gvk)
obj.SetName(pipeline.Spec.AppRef.Name)
obj.SetNamespace(target.Namespace)
return &obj, nil
}

// clusterPrefix returns a string naming the cluster containing an app, to prepend to the usual namespace/name format of the app object itself. So that it can be empty, the separator is include in the return value.
Expand All @@ -328,9 +367,37 @@ func clusterPrefix(ref *v1alpha1.CrossNamespaceClusterReference) string {
}

// setTargetStatus gets the relevant status from the app object given, and records it in the TargetStatus.
func setTargetStatus(status *v1alpha1.TargetStatus, target *helmv2.HelmRelease) {
status.Revision = target.Status.LastAppliedRevision
status.Ready = conditions.IsReady(target.Status.Conditions)
// TODO: this is coupled to targetObject(...), in that it handles the types returned there. It should deal with arbitrary types sensibly.
func setTargetStatus(status *v1alpha1.TargetStatus, targetObject client.Object) {
switch obj := targetObject.(type) {
case *unstructured.Unstructured:
// this assumes it's a Flux-like object; specifically with
// - a Ready condition
// - a .status.lastAppliedRevision
conds, ok, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
if !ok || err != nil {
status.Ready = false
return
}
status.Ready = conditions.IsReadyUnstructured(conds)

lastAppliedRev, ok, err := unstructured.NestedString(obj.Object, "status", "lastAppliedRevision")
if !ok || err != nil {
// It's not an error to lack a Ready condition (new objects will lack any conditions), and it's not an error to lack a lastAppliedRevision
// (maybe it hasn't got that far yet); but it is an error to have a ready condition of true and lack a lastAppliedRevision, since that means
// the object is not a usable target.
if status.Ready {
status.Error = "unable to find .status.lastAppliedRevision in ready target object"
status.Ready = false
return
}
} else {
status.Revision = lastAppliedRev
}
default:
status.Error = "unable to determine ready status for object"
status.Ready = false
}
}

func (r *PipelineReconciler) patchStatus(ctx context.Context, n types.NamespacedName, newStatus v1alpha1.PipelineStatus) error {
Expand Down
17 changes: 17 additions & 0 deletions controllers/leveltriggered/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

helmv2 "github.com/fluxcd/helm-controller/api/v2beta1"
kustomv1 "github.com/fluxcd/kustomize-controller/api/v1"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -128,6 +130,17 @@ func TestReconcile(t *testing.T) {
g.Expect(getTargetStatus(g, p, "test", 0).Error).To(ContainSubstring("not found"))

// FIXME create the app
app := kustomv1.Kustomization{
Spec: kustomv1.KustomizationSpec{
SourceRef: kustomv1.CrossNamespaceSourceReference{
Kind: "OCIRepository",
Name: "dummy",
},
},
}
app.Name = name
app.Namespace = ns.Name
g.Expect(k8sClient.Create(ctx, &app)).To(Succeed())

// now it's there, the pipeline can be reconciled fully
checkReadyCondition(ctx, g, client.ObjectKeyFromObject(pipeline), metav1.ConditionTrue, v1alpha1.ReconciliationSucceededReason)
Expand All @@ -136,6 +149,10 @@ func TestReconcile(t *testing.T) {
g.Expect(getTargetStatus(g, p, "test", 0).Ready).NotTo(BeTrue())
g.Expect(getTargetStatus(g, p, "test", 0).Error).To(BeEmpty())

app.Status.LastAppliedRevision = appRevision
apimeta.SetStatusCondition(&app.Status.Conditions, metav1.Condition{Type: "Ready", Status: metav1.ConditionTrue, Reason: "test"})
g.Expect(k8sClient.Status().Update(ctx, &app)).To(Succeed())

var targetStatus v1alpha1.TargetStatus
g.Eventually(func() bool {
p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline))
Expand Down
18 changes: 12 additions & 6 deletions controllers/leveltriggered/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"sync"
"testing"

"github.com/fluxcd/helm-controller/api/v2beta1"
helmv2 "github.com/fluxcd/helm-controller/api/v2beta1"
kustomv1 "github.com/fluxcd/kustomize-controller/api/v1"

clusterctrlv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -108,11 +110,6 @@ func TestMain(m *testing.M) {
log.Fatalf("get user kubeconfig failed: %s", err)
}

err = v2beta1.AddToScheme(scheme.Scheme)
if err != nil {
log.Fatalf("add helm to schema failed: %s", err)
}

err = v1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
log.Fatalf("add pipelines to schema failed: %s", err)
Expand All @@ -122,6 +119,15 @@ func TestMain(m *testing.M) {
log.Fatalf("add GitopsCluster to schema failed: %s", err)
}

err = helmv2.AddToScheme(scheme.Scheme)
if err != nil {
log.Fatalf("add HelmRelease to schema failed: %s", err)
}
err = kustomv1.AddToScheme(scheme.Scheme)
if err != nil {
log.Fatalf("add Kustomization to schema failed: %s", err)
}

ctrl.SetLogger(zap.New(zap.WriteTo(os.Stderr), zap.UseDevMode(true)))

k8sManager, err = ctrl.NewManager(cfg, ctrl.Options{
Expand Down
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ require (
github.com/fluxcd/go-git-providers v0.14.0
github.com/fluxcd/helm-controller/api v0.25.0
github.com/fluxcd/image-automation-controller v0.26.0
github.com/fluxcd/pkg/apis/meta v0.17.0
github.com/fluxcd/kustomize-controller/api v1.1.0
github.com/fluxcd/pkg/apis/meta v1.1.2
github.com/fluxcd/pkg/git v0.6.1
github.com/fluxcd/pkg/git/gogit v0.1.1-0.20220903061028-ba1e2451e704
github.com/fluxcd/pkg/gittestserver v0.7.0
Expand All @@ -36,9 +37,9 @@ require (
github.com/weaveworks/pipeline-controller/api v0.0.0
github.com/xanzy/go-gitlab v0.78.0
golang.org/x/oauth2 v0.10.0
k8s.io/api v0.27.2
k8s.io/apimachinery v0.27.2
k8s.io/client-go v0.27.2
k8s.io/api v0.27.4
k8s.io/apimachinery v0.27.4
k8s.io/client-go v0.27.4
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f
sigs.k8s.io/cluster-api v1.5.2
sigs.k8s.io/controller-runtime v0.15.1
Expand Down Expand Up @@ -67,7 +68,7 @@ require (
github.com/fluxcd/gitkit v0.6.0 // indirect
github.com/fluxcd/image-reflector-controller/api v0.22.0 // indirect
github.com/fluxcd/pkg/apis/acl v0.1.0 // indirect
github.com/fluxcd/pkg/apis/kustomize v0.6.0 // indirect
github.com/fluxcd/pkg/apis/kustomize v1.1.1 // indirect
github.com/fluxcd/pkg/gitutil v0.2.0 // indirect
github.com/fluxcd/pkg/ssh v0.6.0 // indirect
github.com/fluxcd/pkg/version v0.2.0 // indirect
Expand Down Expand Up @@ -148,8 +149,9 @@ require (
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.27.2 // indirect
k8s.io/component-base v0.27.2 // indirect
k8s.io/apiextensions-apiserver v0.27.4 // indirect
k8s.io/cluster-bootstrap v0.27.2 // indirect
k8s.io/component-base v0.27.4 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
Loading

0 comments on commit 58ca80d

Please sign in to comment.