Skip to content

Commit

Permalink
Add flags to metalctl move
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonSAP committed Nov 6, 2024
1 parent 0fa95f5 commit 5655832
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 61 deletions.
9 changes: 0 additions & 9 deletions cmd/metalctl/app/app.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package app

import (
"path/filepath"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"

metalv1alphav1 "github.com/ironcore-dev/metal-operator/api/v1alpha1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand All @@ -34,7 +29,3 @@ func NewCommand() *cobra.Command {
root.AddCommand(NewMoveCommand())
return root
}

func GetKubeconfig() (*rest.Config, error) {
return clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
}
189 changes: 137 additions & 52 deletions cmd/metalctl/app/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"slices"
"time"

metalv1alphav1 "github.com/ironcore-dev/metal-operator/api/v1alpha1"
"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
sourceKubeconfig string
targetKubeconfig string
errCrdCreate error = errors.New("failed to create metal CRDs")
crdsOnly bool
crsOnly bool
namespace string
dryRun bool
verbose bool
)

func NewMoveCommand() *cobra.Command {
Expand All @@ -24,88 +36,168 @@ func NewMoveCommand() *cobra.Command {
Short: "Move metal-operator CRDs and CRs from one cluster to another",
RunE: runMove,
}
move.Flags().StringVar(&sourceKubeconfig, "source-kubeconfig", "", "Kubeconfig pointing to the source cluster")
move.Flags().StringVar(&targetKubeconfig, "target-kubeconfig", "", "Kubeconfig pointing to the target cluster")
move.Flags().BoolVar(&crdsOnly, "crds-only", false, "migrate only the CRDs without CRs")
move.Flags().BoolVar(&crsOnly, "crs-only", false, "migrate only the CRs without CRDs")
move.Flags().StringVar(&namespace, "namespace", "", "namespace to filter CRDs and CRs to migrate. Defaults to all namespaces if not specified")
move.Flags().BoolVar(&dryRun, "dry-run", false, "show what would be moved without executing the migration")
move.Flags().BoolVar(&verbose, "verbose", false, "enable verbose logging for detailed output during migration")
move.MarkFlagRequired("source-kubeconfig")
move.MarkFlagRequired("target-kubeconfig")

if verbose {
slog.SetLogLoggerLevel(slog.LevelDebug)
}
return move
}

type clients struct {
type Clients struct {
source client.Client
target client.Client
}

func makeClients() (clients, error) {
var clients clients
sourceCfg, err := GetKubeconfig()
func makeClient(kubeconfig string) (client.Client, error) {
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return clients, fmt.Errorf("failed to load source cluster kubeconfig: %w", err)
return nil, fmt.Errorf("failed to load cluster kubeconfig: %w", err)
}
clients.source, err = client.New(sourceCfg, client.Options{Scheme: scheme})
return client.New(cfg, client.Options{Scheme: scheme})
}

func makeClients() (Clients, error) {
var clients Clients
var err error

clients.source, err = makeClient(sourceKubeconfig)
if err != nil {
return clients, fmt.Errorf("failed to construct source cluster client: %w", err)
}
targetCfg, err := clientcmd.BuildConfigFromFlags("", targetKubeconfig)
if err != nil {
return clients, fmt.Errorf("failed to load target cluster kubeconfig: %w", err)
}
clients.target, err = client.New(targetCfg, client.Options{Scheme: scheme})
clients.target, err = makeClient(targetKubeconfig)
if err != nil {
return clients, fmt.Errorf("failed to construct target cluster client: %w", err)
}
return clients, nil
}

func moveCRDs(ctx context.Context, clients clients) error {
var crds apiextensionsv1.CustomResourceDefinitionList
if err := clients.source.List(ctx, &crds); err != nil {
return err
func getMetalObjects(ctx context.Context, cl client.Client) ([]client.Object, error) {
crds := &apiextensionsv1.CustomResourceDefinitionList{}
if err := cl.List(ctx, crds); err != nil {
return nil, fmt.Errorf("couldn't list CRDs: %w", err)
}
metalCrds := make([]apiextensionsv1.CustomResourceDefinition, 0)

metalObjects := make([]client.Object, 0)
for _, crd := range crds.Items {
if crd.Spec.Group == metalv1alphav1.GroupVersion.Group {
metalCrds = append(metalCrds, crd)
if crd.Spec.Group != metalv1alphav1.GroupVersion.Group {
continue
}
if !crsOnly {
metalObjects = append(metalObjects, &crd)
}

if !crdsOnly {
crs := &unstructured.UnstructuredList{}
crs.SetGroupVersionKind(schema.GroupVersionKind{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name, Kind: crd.Spec.Names.Kind})

if err := cl.List(ctx, crs, &client.ListOptions{Namespace: namespace}); err != nil {
return nil, fmt.Errorf("couldn't list CRs: %w", err)
}
for _, cr := range crs.Items { // won't work with go version <1.22
metalObjects = append(metalObjects, &cr)
}
}
}
// it may be better to compare on semantics instead of CRD name
for _, sourceCrd := range metalCrds {
var targetCrd apiextensionsv1.CustomResourceDefinition
err := clients.target.Get(ctx, client.ObjectKeyFromObject(&sourceCrd), &targetCrd)
if apierrors.IsNotFound(err) {

return metalObjects, nil
}

func validateTargetCluster(ctx context.Context, sourceObjs []client.Object, targetClient client.Client) error {
for _, sourceObj := range sourceObjs {
targetObj := sourceObj.DeepCopyObject().(client.Object)
err := targetClient.Get(ctx, client.ObjectKeyFromObject(sourceObj), targetObj)
if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) {
continue
}
// if CRD or CR already exists in target cluster we could still process with move, however pointer fields in Spec makes DeepEqual returns false.
// sourceSpec := reflect.ValueOf(sourceObj).Elem().FieldByName("Spec")
// targetSpec := reflect.ValueOf(targetObj).Elem().FieldByName("Spec")
// if reflect.DeepEqual(sourceSpec, targetSpec) {
// slog.Debug("source and target CRD or CR have the same specification", slog.String("object", metalObjectToString(sourceObj)))
// }
if err != nil {
return fmt.Errorf("failed to check CRD existence in target cluster: %w", err)
return fmt.Errorf("failed to check CRD and CR existence in target cluster: %w", err)
}
return fmt.Errorf("CRD for %s/%s already exists in the target cluster", sourceCrd.Spec.Group, sourceCrd.Spec.Names.Plural)
return fmt.Errorf("%s already exists in the target cluster", client.ObjectKeyFromObject(sourceObj))
}
for _, crd := range metalCrds {
crd.ResourceVersion = ""
if err := clients.target.Create(ctx, &crd); err != nil {
return errCrdCreate
return nil
}

func moveMetalObjects(ctx context.Context, sourceObjs []client.Object, cl client.Client) error {
movedObjects := make([]client.Object, 0)

for _, sourceObj := range sourceObjs {
var err error
sourceObj.SetResourceVersion("")
if err = cl.Create(ctx, sourceObj); err == nil {
if crd, isCrd := sourceObj.(*apiextensionsv1.CustomResourceDefinition); isCrd &&
slices.IndexFunc(sourceObjs, func(obj client.Object) bool {
return obj.GetObjectKind().GroupVersionKind().Kind == crd.Spec.Names.Kind
}) != -1 {
err = waitForMetalCRD(ctx, crd, cl)
}
}
if err != nil {
cleanupErrs := make([]error, 0)
for _, obj := range movedObjects {
if err := cl.Delete(ctx, obj); err != nil {
cleanupErrs = append(cleanupErrs, err)
}
}

return errors.Join(
fmt.Errorf("%s couldn't be created in the target cluster: %w", metalObjectToString(sourceObj), err),
fmt.Errorf("clean up was performed to restore a target cluster's state with error result: %w", errors.Join(cleanupErrs...)))
}
movedObjects = append(movedObjects, sourceObj)
}
return nil
}

func cleanupCRDs(ctx context.Context, clients clients) error {
var crds apiextensionsv1.CustomResourceDefinitionList
if err := clients.target.List(ctx, &crds); err != nil {
func waitForMetalCRD(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, cl client.Client) error {
return wait.PollUntilContextTimeout(ctx, time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
targetCrd := apiextensionsv1.CustomResourceDefinition{}
if err := cl.Get(ctx, client.ObjectKeyFromObject(crd), &targetCrd); apierrors.IsNotFound(err) {
return false, nil
}
for _, condition := range targetCrd.Status.Conditions {
if condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue {
return true, nil
}
}
return false, nil
})
}

func move(ctx context.Context, clients Clients) error {
sourceObjs, err := getMetalObjects(ctx, clients.source)
if err != nil {
return err
}
metalCrds := make([]apiextensionsv1.CustomResourceDefinition, 0)
for _, crd := range crds.Items {
if crd.Spec.Group == metalv1alphav1.GroupVersion.Group {
metalCrds = append(metalCrds, crd)
}
slog.Debug(fmt.Sprintf("found %s CRDs and CRs in the source cluster", metalv1alphav1.GroupVersion.Group), slog.Any("Objects", transform(sourceObjs, metalObjectToString)))

if err = validateTargetCluster(ctx, sourceObjs, clients.target); err != nil {
return err
}
errs := make([]error, 0)
for _, crd := range metalCrds {
crd.ResourceVersion = ""
if err := clients.target.Delete(ctx, &crd); err != nil {
errs = append(errs, err)
slog.Debug(fmt.Sprintf("all %s CRDs and CRs from the source cluster are absent in the target cluster", metalv1alphav1.GroupVersion.Group))

if !dryRun {
err = moveMetalObjects(ctx, sourceObjs, clients.target)
if err == nil {
slog.Debug(fmt.Sprintf("all %s CRDs and CRs from the source cluster were moved to the target cluster", metalv1alphav1.GroupVersion.Group))
}
}
return errors.Join(errs...)

return err
}

func runMove(cmd *cobra.Command, args []string) error {
Expand All @@ -114,12 +206,5 @@ func runMove(cmd *cobra.Command, args []string) error {
return err
}
ctx := cmd.Context()
err = moveCRDs(ctx, clients)
switch {
case errors.Is(err, errCrdCreate):
return cleanupCRDs(ctx, clients)
case err != nil:
return err
}
return nil
return move(ctx, clients)
}
30 changes: 30 additions & 0 deletions cmd/metalctl/app/move_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package app

import (
"context"
"log/slog"

metalv1alpha1 "github.com/ironcore-dev/metal-operator/api/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("metalctl move", func() {
_ = SetupTest()

It("Should successfully create metal CRDs and CRs from a source cluster on a target cluster", func(ctx SpecContext) {
cr := &metalv1alpha1.BMC{ObjectMeta: metav1.ObjectMeta{GenerateName: "test-"}}
Expect(clients.source.Create(ctx, cr)).To(Succeed())
slog.SetLogLoggerLevel(slog.LevelDebug)
err := move(context.TODO(), clients)
Expect(err).To(BeNil())

Eventually(func(g Gomega) error {
targetCr := metalv1alpha1.BMC{}
return clients.target.Get(context.Background(), client.ObjectKeyFromObject(cr), &targetCr)
}).Should(Succeed())
})
})
Loading

0 comments on commit 5655832

Please sign in to comment.