Skip to content

Commit

Permalink
Move CRs with correct owner references
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonSAP committed Nov 14, 2024
1 parent 140778d commit 7dec144
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 129 deletions.
265 changes: 193 additions & 72 deletions cmd/metalctl/app/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -73,141 +74,261 @@ func makeClients() (Clients, error) {

clients.source, err = makeClient(sourceKubeconfig)
if err != nil {
return clients, fmt.Errorf("failed to construct source cluster client: %w", err)
return clients, fmt.Errorf("failed to construct a source cluster client: %w", err)
}
clients.target, err = makeClient(targetKubeconfig)
if err != nil {
return clients, fmt.Errorf("failed to construct target cluster client: %w", err)
return clients, fmt.Errorf("failed to construct a target cluster client: %w", err)
}
return clients, nil
}

// getMetalObjects returns CRDs and CRs from metal group. CRDs in a returned list are before theirs CRs.
func getMetalObjects(ctx context.Context, cl client.Client) ([]client.Object, error) {
func getMetalCrds(ctx context.Context, cl client.Client) ([]*apiextensionsv1.CustomResourceDefinition, error) {
crds := &apiextensionsv1.CustomResourceDefinitionList{}
if err := cl.List(ctx, crds); err != nil {
return nil, fmt.Errorf("couldn't list CRDs: %w", err)
}

metalObjects := make([]client.Object, 0)
metalCrds := make([]*apiextensionsv1.CustomResourceDefinition, 0)
for _, crd := range crds.Items {
if crd.Spec.Group != metalv1alphav1.GroupVersion.Group {
continue
}
if !crsOnly {
metalObjects = append(metalObjects, &crd)
if crd.Spec.Group == metalv1alphav1.GroupVersion.Group {
metalCrds = append(metalCrds, &crd)
}
}

return metalCrds, nil
}

if !crdsOnly {
crs := &unstructured.UnstructuredList{}
crs.SetGroupVersionKind(schema.GroupVersionKind{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name, Kind: crd.Spec.Names.Kind})
func getMetalCrs(ctx context.Context, cl client.Client, crds []*apiextensionsv1.CustomResourceDefinition) ([]*unstructured.Unstructured, error) {
crs := make([]*unstructured.Unstructured, 0)
for _, crd := range crds {
CRs := &unstructured.UnstructuredList{}
CRs.SetGroupVersionKind(schema.GroupVersionKind{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name, Kind: crd.Spec.Names.Kind})

if err := cl.List(ctx, crs, &client.ListOptions{Namespace: namespace}); err != nil {
return nil, fmt.Errorf("couldn't list CRs: %w", err)
}
for _, cr := range crs.Items { // won't work with go version <1.22
metalObjects = append(metalObjects, &cr)
}
if err := cl.List(ctx, CRs, &client.ListOptions{Namespace: namespace}); err != nil {
return nil, fmt.Errorf("couldn't list CRs: %w", err)
}
for _, cr := range CRs.Items { // won't work with go version <1.22
crs = append(crs, &cr)
}
}

return metalObjects, nil
return crs, nil
}

func clearObjectFields(obj client.Object) map[string]interface{} {
func clearFields(obj client.Object) map[string]interface{} {
so, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
delete(so, "status")

for _, field := range []string{"creationTimestamp", "resourceVersion", "uid", "generation", "managedFields"} {
delete(so["metadata"].(map[string]interface{}), field)
}

if so["status"] != nil && so["status"].(map[string]interface{})["conditions"] != nil {
for _, field := range so["status"].(map[string]interface{})["conditions"].([]interface{}) {
delete(field.(map[string]interface{}), "lastTransitionTime")
}
}

return so
}

func getObjectsToBeMoved(ctx context.Context, sourceObjs []client.Object, targetClient client.Client) ([]client.Object, error) {
objectsToBeMoved := make([]client.Object, 0, len(sourceObjs))
for _, sourceObj := range sourceObjs {
targetObj := sourceObj.DeepCopyObject().(client.Object)
err := targetClient.Get(ctx, client.ObjectKeyFromObject(sourceObj), targetObj)
if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) {
objectsToBeMoved = append(objectsToBeMoved, sourceObj)
func getCrdsToBeMoved(ctx context.Context, targetClient client.Client, sourceCrds []*apiextensionsv1.CustomResourceDefinition) ([]*apiextensionsv1.CustomResourceDefinition, error) {
crdsToMove := make([]*apiextensionsv1.CustomResourceDefinition, 0, len(sourceCrds))
for _, sourceCrd := range sourceCrds {
targetCrd := sourceCrd.DeepCopy()
err := targetClient.Get(ctx, client.ObjectKeyFromObject(sourceCrd), targetCrd)
if apierrors.IsNotFound(err) {
crdsToMove = append(crdsToMove, sourceCrd)
continue
}

if err != nil {
return nil, fmt.Errorf("failed to check CRD and CR existence in target cluster: %w", err)
return nil, fmt.Errorf("failed to check CRD existence in the target cluster: %w", err)
}

if reflect.DeepEqual(clearObjectFields(sourceObj), clearObjectFields(targetObj)) {
slog.Debug("source and target CRD or CR are the same", slog.String("object", metalObjectToString(sourceObj)))
if reflect.DeepEqual(clearFields(sourceCrd), clearFields(targetCrd)) {
slog.Debug("source and target CRDs are the same", slog.String("CRD", crdKind(sourceCrd)))
continue
}
return nil, fmt.Errorf("%s already exists in the target cluster", client.ObjectKeyFromObject(sourceObj))
return nil, fmt.Errorf("a CRD %s exists in the target cluster and is different then in the source cluster", sourceCrd.GetName())
}
return objectsToBeMoved, nil
return crdsToMove, nil
}

func moveMetalObjects(ctx context.Context, sourceObjs []client.Object, cl client.Client) error {
movedObjects := make([]client.Object, 0)

for _, sourceObj := range sourceObjs {
var err error
sourceObj.SetResourceVersion("")
if err = cl.Create(ctx, sourceObj); err == nil {
if crd, isCrd := sourceObj.(*apiextensionsv1.CustomResourceDefinition); isCrd &&
slices.IndexFunc(sourceObjs, func(obj client.Object) bool {
return obj.GetObjectKind().GroupVersionKind().Kind == crd.Spec.Names.Kind
}) != -1 {
err = waitForMetalCRD(ctx, crd, cl)
}
func getCrsToBeMoved(ctx context.Context, targetClient client.Client, sourceCrs []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
crsToMove := make([]*unstructured.Unstructured, 0, len(sourceCrs))
for _, sourceCr := range sourceCrs {
targetCr := sourceCr.DeepCopy()
err := targetClient.Get(ctx, client.ObjectKeyFromObject(sourceCr), targetCr)
if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) {
crsToMove = append(crsToMove, sourceCr)
continue
}

if err != nil {
cleanupErrs := make([]error, 0)
for _, obj := range movedObjects {
if err := cl.Delete(ctx, obj); err != nil {
cleanupErrs = append(cleanupErrs, err)
}
}
return nil, fmt.Errorf("failed to check CR existence in the target cluster: %w", err)
}

if reflect.DeepEqual(clearFields(sourceCr), clearFields(targetCr)) {
slog.Debug("source and target CRs are the same", slog.String("CR", crName(sourceCr)))
continue
}
return nil, fmt.Errorf("a CR %s/%s already exists in the target cluster and is different then in the source cluster", sourceCr.GetNamespace(), sourceCr.GetName())
}
return crsToMove, nil
}

type Node struct {
Cr *unstructured.Unstructured
Children []*Node
}

func crsOwnerReferenceTrees(crs []*unstructured.Unstructured) []*Node {
nodeMap := make(map[types.UID]*Node)

for _, cr := range crs {
nodeMap[cr.GetUID()] = &Node{Cr: cr}
}
roots := []*Node{}
for _, cr := range crs {
if len(cr.GetOwnerReferences()) == 0 || nodeMap[cr.GetOwnerReferences()[0].UID] == nil {
roots = append(roots, nodeMap[cr.GetUID()])
} else {
owner := nodeMap[cr.GetOwnerReferences()[0].UID]
owner.Children = append(owner.Children, nodeMap[cr.GetUID()])
}
}
return roots
}

return errors.Join(
fmt.Errorf("%s couldn't be created in the target cluster: %w", metalObjectToString(sourceObj), err),
fmt.Errorf("clean up was performed to restore a target cluster's state with error result: %w", errors.Join(cleanupErrs...)))
func cleanup[T client.Object](ctx context.Context, cl client.Client, objs []T) error {
cleanupErrs := make([]error, 0)
for _, obj := range objs {
if err := cl.Delete(ctx, obj); err != nil {
cleanupErrs = append(cleanupErrs, err)
}
movedObjects = append(movedObjects, sourceObj)
}
return nil
return errors.Join(cleanupErrs...)
}

func waitForMetalCRD(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, cl client.Client) error {
return wait.PollUntilContextTimeout(ctx, time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
targetCrd := apiextensionsv1.CustomResourceDefinition{}
if err := cl.Get(ctx, client.ObjectKeyFromObject(crd), &targetCrd); apierrors.IsNotFound(err) {
return false, nil
func moveCrds(ctx context.Context, cl client.Client, sourceCrds []*apiextensionsv1.CustomResourceDefinition) (movedCrds []*apiextensionsv1.CustomResourceDefinition, err error) {
movedCrds = make([]*apiextensionsv1.CustomResourceDefinition, 0)
for _, crd := range sourceCrds {
crd.SetResourceVersion("")
if err = cl.Create(ctx, crd); err != nil {
err = fmt.Errorf("CRD %s couldn't be created in the target cluster: %w", crdKind(crd), err)
return
}
for _, condition := range targetCrd.Status.Conditions {
if condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue {
return true, nil
movedCrds = append(movedCrds, crd)
}

// wait for CRDs to be present on the target cluster
err = wait.PollUntilContextTimeout(ctx, time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
for _, crd := range movedCrds {
targetObj := apiextensionsv1.CustomResourceDefinition{}
err := cl.Get(ctx, client.ObjectKeyFromObject(crd), &targetObj)
if err != nil {
return false, client.IgnoreNotFound(err)
}
}
return false, nil
return true, nil
})

return
}

func moveCrs(ctx context.Context, cl client.Client, crsTrees []*Node, uid ...types.UID) (movedCrs []*unstructured.Unstructured, err error) {
movedCrs = make([]*unstructured.Unstructured, 0)

for _, crsTree := range crsTrees {
ownerReferences := crsTree.Cr.GetOwnerReferences()
if len(ownerReferences) == 1 && len(uid) == 1 {
ownerReferences[0].UID = uid[0]
crsTree.Cr.SetOwnerReferences(ownerReferences)
}

crsTree.Cr.SetResourceVersion("")
if err = cl.Create(ctx, crsTree.Cr); err != nil {
err = fmt.Errorf("CR %s couldn't be created in the target cluster: %w", crName(crsTree.Cr), err)
return
}
movedCrs = append(movedCrs, crsTree.Cr)
}

for _, crsTree := range crsTrees {
err = wait.PollUntilContextTimeout(ctx, time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
// retrive uid of an owner
ownerCr := crsTree.Cr.DeepCopy()
err := cl.Get(ctx, client.ObjectKeyFromObject(crsTree.Cr), ownerCr)
if err != nil {
return false, client.IgnoreNotFound(err)
}

// create children CRs
var movedChildrenCrs []*unstructured.Unstructured
movedChildrenCrs, err = moveCrs(ctx, cl, crsTree.Children, ownerCr.GetUID())
movedCrs = slices.Concat(movedCrs, movedChildrenCrs)
return true, err
})
if err != nil {
return
}
}

return
}

func moveMetalObjects(ctx context.Context, cl client.Client, sourceCrds []*apiextensionsv1.CustomResourceDefinition, crsTrees []*Node) (err error) {
var movedCrds []*apiextensionsv1.CustomResourceDefinition
if movedCrds, err = moveCrds(ctx, cl, sourceCrds); err != nil {
err = errors.Join(err,
fmt.Errorf("clean up was performed to restore a target cluster's state with error result: %w", cleanup(ctx, cl, movedCrds)))
return
}

var movedCrs []*unstructured.Unstructured
if movedCrs, err = moveCrs(ctx, cl, crsTrees); err != nil {
err = errors.Join(err,
fmt.Errorf("clean up of CRs was performed to restore a target cluster's state with error result: %w", cleanup(ctx, cl, movedCrs)),
fmt.Errorf("clean up of CRDs was performed to restore a target cluster's state with error result: %w", cleanup(ctx, cl, movedCrds)))
}
return
}

func move(ctx context.Context, clients Clients) error {
sourceObjs, err := getMetalObjects(ctx, clients.source)
sourceCrds, err := getMetalCrds(ctx, clients.source)
if err != nil {
return err
}
slog.Debug(fmt.Sprintf("found %s CRDs and CRs in the source cluster", metalv1alphav1.GroupVersion.Group), slog.Any("Objects", transform(sourceObjs, metalObjectToString)))
sourceCrs := make([]*unstructured.Unstructured, 0)
if !crdsOnly {
sourceCrs, err = getMetalCrs(ctx, clients.source, sourceCrds)
if err != nil {
return err
}
}
if crsOnly {
sourceCrds = make([]*apiextensionsv1.CustomResourceDefinition, 0)
}
slog.Debug(fmt.Sprintf("found %s CRDs and CRs in the source cluster", metalv1alphav1.GroupVersion.Group),
slog.Any("CRDs", transform(sourceCrds, crdKind)),
slog.Any("CRs", transform(sourceCrs, crName)))

objectsToBeMoved, err := getObjectsToBeMoved(ctx, sourceObjs, clients.target)
crdsToMove, err := getCrdsToBeMoved(ctx, clients.target, sourceCrds)
if err != nil {
return err
}
crsToMove, err := getCrsToBeMoved(ctx, clients.target, sourceCrs)
if err != nil {
return err
}
slog.Debug("moving", slog.Any("Objects", transform(objectsToBeMoved, metalObjectToString)))
slog.Debug("moving",
slog.Any("CRDs", transform(crdsToMove, crdKind)),
slog.Any("CRs", transform(crsToMove, crName)))

if !dryRun {
err = moveMetalObjects(ctx, objectsToBeMoved, clients.target)
crsTrees := crsOwnerReferenceTrees(crsToMove)
err = moveMetalObjects(ctx, clients.target, crdsToMove, crsTrees)
if err == nil {
slog.Debug(fmt.Sprintf("all %s CRDs and CRs from the source cluster were moved to the target cluster", metalv1alphav1.GroupVersion.Group))
}
Expand Down
Loading

0 comments on commit 7dec144

Please sign in to comment.