Skip to content

Commit

Permalink
Merge pull request #91 from controlplaneio-fluxcd/crd-migration
Browse files Browse the repository at this point in the history
Implement storage migration for Flux CRDs
  • Loading branch information
stefanprodan authored Sep 10, 2024
2 parents eacbbd2 + 0a7c85f commit f5ea2bf
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 1 deletion.
7 changes: 6 additions & 1 deletion api/v1/fluxinstance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ type FluxInstanceSpec struct {
// Wait instructs the controller to check the health of all the reconciled
// resources. Defaults to true.
// +kubebuilder:default:=true
// +required
Wait bool `json:"wait"`

// MigrateResources instructs the controller to migrate the Flux custom resources
// from the previous version to the latest API version specified in the CRD.
// Defaults to true.
// +kubebuilder:default:=true
MigrateResources bool `json:"migrateResources"`

// Sync specifies the source for the cluster sync operation.
// When set, a Flux source (GitRepository, OCIRepository or Bucket)
// and Flux Kustomization are created to sync the cluster state
Expand Down
8 changes: 8 additions & 0 deletions config/crd/bases/fluxcd.controlplane.io_fluxinstances.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ spec:
type: object
type: array
type: object
migrateResources:
default: true
description: |-
MigrateResources instructs the controller to migrate the Flux custom resources
from the previous version to the latest API version specified in the CRD.
Defaults to true.
type: boolean
storage:
description: |-
Storage holds the specification of the source-controller
Expand Down Expand Up @@ -273,6 +280,7 @@ spec:
type: boolean
required:
- distribution
- migrateResources
- wait
type: object
status:
Expand Down
10 changes: 10 additions & 0 deletions docs/api/v1/fluxinstance.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,16 @@ stringData:
secretkey: "my-secretkey"
```

### Resources migration configuration

The `.spec.migrateResources` field is optional and instructs the operator to migrate
the Flux custom resources stored in Kubernetes etcd to the latest API version as
specified in the Flux CRDs. The migration runs after the Flux distribution is upgraded
from a minor version to another and only when a new API version is introduced.

By default, the field value is set to `true`. Note that disabling the migration may
result in upgrade failures due to deprecated API versions being removed in future Flux releases.

## FluxInstance Status

### Conditions
Expand Down
7 changes: 7 additions & 0 deletions internal/controller/fluxinstance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,13 @@ func (r *FluxInstanceReconciler) apply(ctx context.Context,
log.Info("Health check completed", "revision", buildResult.Revision)
}

// Migrate all custom resources if the Flux CRDs storage version has changed.
if obj.Spec.MigrateResources {
if err := r.migrateResources(ctx, client.MatchingLabels{"app.kubernetes.io/part-of": obj.Name}); err != nil {
log.Error(err, "failed to migrate resources to the latest storage version")
}
}

return nil
}

Expand Down
121 changes: 121 additions & 0 deletions internal/controller/fluxinstance_migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package controller

import (
"context"
"fmt"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// migrateResources migrates the resources for the CRDs that match the given label selector
// to the latest storage version and updates the CRD status to contain only the latest storage version.
func (r *FluxInstanceReconciler) migrateResources(ctx context.Context, labelSelector client.MatchingLabels) error {
crdList := &apiextensionsv1.CustomResourceDefinitionList{}

if err := r.Client.List(ctx, crdList, labelSelector); err != nil {
return fmt.Errorf("failed to list CRDs: %w", err)
}

for _, crd := range crdList.Items {
if err := r.migrateCRD(ctx, crd.Name); err != nil {
return err
}
}

return nil
}

// migrateCRD migrates the custom resources for the given CRD to the latest storage version
// and updates the CRD status to contain only the latest storage version.
func (r *FluxInstanceReconciler) migrateCRD(ctx context.Context, name string) error {
log := ctrl.LoggerFrom(ctx)
crd := &apiextensionsv1.CustomResourceDefinition{}

if err := r.Client.Get(ctx, client.ObjectKey{Name: name}, crd); err != nil {
return fmt.Errorf("failed to get CRD %s: %w", name, err)
}

// get the latest storage version for the CRD
storageVersion := r.getStorageVersion(crd)
if storageVersion == "" {
return fmt.Errorf("no storage version found for CRD %s", name)
}

// return early if the CRD has a single stored version
if len(crd.Status.StoredVersions) == 1 && crd.Status.StoredVersions[0] == storageVersion {
return nil
}

// migrate the resources for the CRD
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.migrateCR(ctx, crd, storageVersion)
})
if err != nil {
return fmt.Errorf("failed to migrate resources for CRD %s: %w", name, err)
}

// patch the CRD status to update the stored version to the latest
crd.Status.StoredVersions = []string{storageVersion}
if err := r.Client.Status().Update(ctx, crd); err != nil {
return fmt.Errorf("failed to update CRD %s status: %w", crd.Name, err)
}

log.Info("CRD migrated "+crd.Name, "storageVersion", storageVersion)

return nil
}

// migrateCR migrates the resources for the given CRD to the specified version
// by patching them with an empty patch.
func (r *FluxInstanceReconciler) migrateCR(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, version string) error {
list := &unstructured.UnstructuredList{}

apiVersion := crd.Spec.Group + "/" + version
listKind := crd.Spec.Names.ListKind

list.SetAPIVersion(apiVersion)
list.SetKind(listKind)

err := r.Client.List(ctx, list, client.InNamespace(""))
if err != nil {
return fmt.Errorf("failed to list resources for CRD %s: %w", crd.Name, err)
}

if len(list.Items) == 0 {
return nil
}

for _, item := range list.Items {
// patch the resource with an empty patch to update the version
if err := r.Client.Patch(
ctx,
&item,
client.RawPatch(client.Merge.Type(), []byte("{}")),
); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to patch resource %s: %w", item.GetName(), err)
}
}

return nil
}

func (r *FluxInstanceReconciler) getStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) string {
var version string

for _, v := range crd.Spec.Versions {
if v.Storage {
version = v.Name
break
}
}

return version
}

0 comments on commit f5ea2bf

Please sign in to comment.