From a3f1f26b40910451f5c7434847caa642a36a6833 Mon Sep 17 00:00:00 2001 From: vbadrina Date: Mon, 18 Nov 2024 13:39:19 +0530 Subject: [PATCH] Enhance MirrorPeer and S3 Secret Handling and other bug fixes - Updated MirrorPeerReconciler to accurately handle storage client references and added deleteMirrorPeer conditional to account for provider mode. - Introduced annotation support in ObjectBucketClaim for dynamic setting based on MirrorPeer type. - Added new constants and enums for OBC types and annotations to enhance readability and standardize key usage. - Enhanced S3 secret synchronization logic to differentiate between client and cluster references, fetching relevant storage cluster info per context. - Refined ConfigMap client information updates with new GetKey helper and corrected test cases for compatibility. - Implemented FetchMirrorPeerByName for efficient MirrorPeer retrieval by name. - Modified utility methods for secret and config map creation to handle namespace variations, ensuring correct retrieval in provider-client contexts. - Refactored validation and utility methods for enhanced error handling and consistent namespace management. Signed-off-by: vbadrina --- addons/agent_mirrorpeer_controller.go | 90 +++--- addons/agent_mirrorpeer_controller_test.go | 6 +- addons/constants.go | 9 + addons/manager.go | 126 +++++--- addons/onboarding_token.go | 48 ++- addons/rook_secret_handler.go | 2 +- addons/s3_controller.go | 17 +- addons/s3_controller_test.go | 4 + addons/s3_secret_handler.go | 68 ++-- addons/secret_exchange_handler_utils.go | 22 +- .../spoke_clusterrole.yaml | 4 +- controllers/common_controller_utils.go | 42 +-- controllers/common_controller_utils_test.go | 3 + controllers/managedclusterview_controller.go | 7 +- .../managedclusterview_controller_test.go | 6 +- controllers/mirrorpeer_controller.go | 295 ++++++++++++++---- controllers/utils/cluster.go | 24 ++ controllers/utils/hash.go | 21 +- controllers/utils/peer_ref.go | 24 +- controllers/utils/s3.go | 30 +- controllers/utils/secret.go | 12 + controllers/validations.go | 143 ++++++++- 22 files changed, 772 insertions(+), 231 deletions(-) diff --git a/addons/agent_mirrorpeer_controller.go b/addons/agent_mirrorpeer_controller.go index 1bf0760d..b24048f3 100644 --- a/addons/agent_mirrorpeer_controller.go +++ b/addons/agent_mirrorpeer_controller.go @@ -18,9 +18,9 @@ package addons import ( "context" - "encoding/json" "fmt" "log/slog" + "os" "strconv" "time" @@ -71,12 +71,34 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - scr, err := utils.GetCurrentStorageClusterRef(&mirrorPeer, r.SpokeClusterName) + hasStorageClientRef, err := utils.IsStorageClientType(ctx, r.SpokeClient, mirrorPeer, true) + logger.Info("MirrorPeer has client reference?", "True/False", hasStorageClientRef) + if err != nil { - logger.Error("Failed to get current storage cluster ref", "error", err) + logger.Error("Failed to check if storage client ref exists", "error", err) return ctrl.Result{}, err } + var scr *multiclusterv1alpha1.StorageClusterRef + if hasStorageClientRef { + currentNamespace := os.Getenv("POD_NAMESPACE") + sc, err := utils.GetStorageClusterFromCurrentNamespace(ctx, r.SpokeClient, currentNamespace) + if err != nil { + logger.Error("Failed to fetch StorageCluster for given namespace", "Namespace", currentNamespace) + return ctrl.Result{}, err + } + scr = &multiclusterv1alpha1.StorageClusterRef{ + Name: sc.Name, + Namespace: sc.Namespace, + } + } else { + scr, err = utils.GetCurrentStorageClusterRef(&mirrorPeer, r.SpokeClusterName) + if err != nil { + logger.Error("Failed to get current storage cluster ref", "error", err) + return ctrl.Result{}, err + } + } + agentFinalizer := r.SpokeClusterName + "." + SpokeMirrorPeerFinalizer if len(agentFinalizer) > 63 { agentFinalizer = fmt.Sprintf("%s.%s", r.SpokeClusterName[0:10], SpokeMirrorPeerFinalizer) @@ -92,10 +114,13 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } } else { - result, err := r.deleteMirrorPeer(ctx, mirrorPeer, scr) - if err != nil { - return result, err - } + if !hasStorageClientRef { + result, err := r.deleteMirrorPeer(ctx, mirrorPeer, scr) + if err != nil { + return result, err + } + } // TODO Write complete deletion for Provider mode mirrorpeer + err = r.HubClient.Get(ctx, req.NamespacedName, &mirrorPeer) if err != nil { if errors.IsNotFound(err) { @@ -115,13 +140,6 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - hasStorageClientRef, err := utils.IsStorageClientType(ctx, r.SpokeClient, mirrorPeer, true) - - if err != nil { - logger.Error("Failed to check if storage client ref exists", "error", err) - return ctrl.Result{}, err - } - logger.Info("Creating S3 buckets") err = r.createS3(ctx, mirrorPeer, scr.Namespace, hasStorageClientRef) if err != nil { @@ -179,16 +197,13 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } if err == nil { - type OnboardingTicket struct { - ID string `json:"id"` - ExpirationDate int64 `json:"expirationDate,string"` - StorageQuotaInGiB uint `json:"storageQuotaInGiB,omitempty"` - } - var ticketData OnboardingTicket - err = json.Unmarshal(token.Data["storagecluster-peer-token"], &ticketData) + logger.Info("Trying to unmarshal onboarding token.") + ticketData, err := UnmarshalOnboardingToken(&token) if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to unmarshal onboarding ticket message. %w", err) + logger.Error("Failed to unmarshal the onboarding ticket data") + return ctrl.Result{}, err } + logger.Info("Successfully unmarshalled onboarding ticket", "ticketData", ticketData) if ticketData.ExpirationDate > time.Now().Unix() { logger.Info("Onboarding token has not expired yet. Not renewing it.", "Token", token.Name, "ExpirationDate", ticketData.ExpirationDate) return ctrl.Result{}, nil @@ -314,24 +329,23 @@ func (r *MirrorPeerReconciler) labelRBDStorageClasses(ctx context.Context, stora } func (r *MirrorPeerReconciler) createS3(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, scNamespace string, hasStorageClientRef bool) error { - bucketCount := 1 + bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace) + bucketName := utils.GenerateBucketName(mirrorPeer, hasStorageClientRef) + annotations := map[string]string{ + utils.MirrorPeerNameAnnotationKey: mirrorPeer.Name, + } if hasStorageClientRef { - bucketCount = 2 + annotations[OBCTypeAnnotationKey] = string(CLIENT) + + } else { + annotations[OBCTypeAnnotationKey] = string(CLUSTER) } - for index := 0; index < bucketCount; index++ { - bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace) - var bucketName string - if hasStorageClientRef { - bucketName = utils.GenerateBucketName(mirrorPeer, mirrorPeer.Spec.Items[index].StorageClusterRef.Name) - } else { - bucketName = utils.GenerateBucketName(mirrorPeer) - } - operationResult, err := utils.CreateOrUpdateObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace) - if err != nil { - return err - } - r.Logger.Info(fmt.Sprintf("ObjectBucketClaim %s was %s in namespace %s", bucketName, operationResult, bucketNamespace)) + + operationResult, err := utils.CreateOrUpdateObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace, annotations) + if err != nil { + return err } + r.Logger.Info(fmt.Sprintf("ObjectBucketClaim %s was %s in namespace %s", bucketName, operationResult, bucketNamespace)) return nil } @@ -519,7 +533,7 @@ func (r *MirrorPeerReconciler) deleteGreenSecret(ctx context.Context, spokeClust // deleteS3 deletes the S3 bucket in the storage cluster namespace, each new mirrorpeer generates // a new bucket, so we do not need to check if the bucket is being used by another mirrorpeer func (r *MirrorPeerReconciler) deleteS3(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, scNamespace string) error { - bucketName := utils.GenerateBucketName(mirrorPeer) + bucketName := utils.GenerateBucketName(mirrorPeer, false) bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace) noobaaOBC, err := utils.GetObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace) if err != nil { diff --git a/addons/agent_mirrorpeer_controller_test.go b/addons/agent_mirrorpeer_controller_test.go index e011234a..38052a97 100644 --- a/addons/agent_mirrorpeer_controller_test.go +++ b/addons/agent_mirrorpeer_controller_test.go @@ -3,6 +3,7 @@ package addons import ( "context" "fmt" + "os" "testing" "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" @@ -126,6 +127,7 @@ func TestMirrorPeerReconcile(t *testing.T) { ctx := context.TODO() scheme := mgrScheme fakeHubClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&mirrorpeer1, &mirrorpeer2).Build() + os.Setenv("POD_NAMESPACE", "test-namespace") oppositePeerRefsArray := make([][]multiclusterv1alpha1.PeerRef, 0) // Quick iteration to get peer refs for _, pr := range mirrorpeer1.Spec.Items { @@ -225,7 +227,7 @@ func TestDisableMirroring(t *testing.T) { }, } - fakeSpokeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&storageCluster).Build() + fakeSpokeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&storageCluster, &odfInfoConfigMap).Build() r := MirrorPeerReconciler{ HubClient: fakeHubClient, SpokeClient: fakeSpokeClient, @@ -304,7 +306,7 @@ func TestDeleteGreenSecret(t *testing.T) { } func TestDeleteS3(t *testing.T) { - bucketName := utils.GenerateBucketName(mirrorPeer) + bucketName := utils.GenerateBucketName(mirrorPeer, false) ctx := context.TODO() scheme := mgrScheme fakeHubClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&mirrorpeer1).Build() diff --git a/addons/constants.go b/addons/constants.go index d584fd7e..58099a52 100644 --- a/addons/constants.go +++ b/addons/constants.go @@ -1,5 +1,7 @@ package addons +type OBCTypeValue string + const ( RBDProvisionerTemplate = "%s.rbd.csi.ceph.com" MaintenanceModeFinalizer = "maintenance.multicluster.odf.openshift.io" @@ -10,4 +12,11 @@ const ( StorageIDKey = "storageid" CephFSProvisionerTemplate = "%s.cephfs.csi.ceph.com" SpokeMirrorPeerFinalizer = "spoke.multicluster.odf.openshift.io" + OBCTypeAnnotationKey = "multicluster.odf.openshift.io/obc-type" + OBCNameAnnotationKey = "multicluster.odf.openshift.io/obc-name" +) + +var ( + CLIENT OBCTypeValue = "client" + CLUSTER OBCTypeValue = "cluster" ) diff --git a/addons/manager.go b/addons/manager.go index 9188f449..d17c8390 100644 --- a/addons/manager.go +++ b/addons/manager.go @@ -21,7 +21,9 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" runtimewait "k8s.io/apimachinery/pkg/util/wait" @@ -214,6 +216,43 @@ func runHubManager(ctx context.Context, options AddonAgentOptions, logger *slog. os.Exit(1) } } +func isProviderModeEnabled(ctx context.Context, cl client.Reader, namespace string, logger *slog.Logger) bool { + storageClusterCRD := &metav1.PartialObjectMetadata{} + storageClusterCRD.SetGroupVersionKind( + extv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"), + ) + storageClusterCRD.Name = "storageclusters.ocs.openshift.io" + if err := cl.Get(ctx, client.ObjectKeyFromObject(storageClusterCRD), storageClusterCRD); client.IgnoreNotFound(err) != nil { + logger.Error("Failed to find presence of StorageCluster CRD", "Error", err) + return false + } + + if storageClusterCRD.UID != "" { + storageClusters := &metav1.PartialObjectMetadataList{} + storageClusters.SetGroupVersionKind( + schema.GroupVersionKind{ + Group: "ocs.openshift.io", + Version: "v1", + Kind: "StorageCluster", + }, + ) + if err := cl.List(ctx, storageClusters, client.InNamespace(namespace), client.Limit(1)); err != nil { + logger.Error("Failed to list StorageCluster CR") + return false + } + if len(storageClusters.Items) < 1 { + logger.Error("StorageCluster CR does not exist") + return false + } + logger.Info("Checking if StorageCluster indicates ODF is deployed in provider mode") + if storageClusters.Items[0].GetAnnotations()["ocs.openshift.io/deployment-mode"] != "provider" { + return false + } + } + + logger.Info("Conditions not met. Controllers will be skipped.") + return true +} func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slog.Logger) { hubConfig, err := GetClientConfig(options.HubKubeconfigFile) @@ -251,40 +290,54 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo currentNamespace := os.Getenv("POD_NAMESPACE") - if err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { - logger.Info("Waiting for MaintenanceMode CRD to be established. MaintenanceMode controller is not running yet.") - // Wait for 45s as it takes time for MaintenanceMode CRD to be created. - return runtimewait.PollUntilContextCancel(ctx, 15*time.Second, true, - func(ctx context.Context) (done bool, err error) { - var crd extv1.CustomResourceDefinition - readErr := mgr.GetAPIReader().Get(ctx, types.NamespacedName{Name: "maintenancemodes.ramendr.openshift.io"}, &crd) - if readErr != nil { - logger.Error("Unable to get MaintenanceMode CRD", readErr) - // Do not initialize err as we want to retry. - // err!=nil or done==true will end polling. - done = false - return - } - if crd.Spec.Group == "ramendr.openshift.io" && crd.Spec.Names.Kind == "MaintenanceMode" { - if err = (&MaintenanceModeReconciler{ - Scheme: mgr.GetScheme(), - SpokeClient: mgr.GetClient(), - SpokeClusterName: options.SpokeClusterName, - Logger: logger.With("controller", "MaintenanceModeReconciler"), - }).SetupWithManager(mgr); err != nil { - klog.Error("Unable to create MaintenanceMode controller.", err) + if !isProviderModeEnabled(ctx, mgr.GetAPIReader(), currentNamespace, logger) { + logger.Info("Cluster is not running in provider mode. Setting up blue and maintenance mode controllers") + if err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + logger.Info("Waiting for MaintenanceMode CRD to be established. MaintenanceMode controller is not running yet.") + // Wait for 45s as it takes time for MaintenanceMode CRD to be created. + return runtimewait.PollUntilContextCancel(ctx, 15*time.Second, true, + func(ctx context.Context) (done bool, err error) { + var crd extv1.CustomResourceDefinition + readErr := mgr.GetAPIReader().Get(ctx, types.NamespacedName{Name: "maintenancemodes.ramendr.openshift.io"}, &crd) + if readErr != nil { + logger.Error("Unable to get MaintenanceMode CRD", "Error", readErr) + // Do not initialize err as we want to retry. + // err!=nil or done==true will end polling. + done = false + return + } + if crd.Spec.Group == "ramendr.openshift.io" && crd.Spec.Names.Kind == "MaintenanceMode" { + if err = (&MaintenanceModeReconciler{ + Scheme: mgr.GetScheme(), + SpokeClient: mgr.GetClient(), + SpokeClusterName: options.SpokeClusterName, + Logger: logger.With("controller", "MaintenanceModeReconciler"), + }).SetupWithManager(mgr); err != nil { + klog.Error("Unable to create MaintenanceMode controller.", err) + return + } + logger.Info("MaintenanceMode CRD exists and controller is now running") + done = true return } - logger.Info("MaintenanceMode CRD exists and controller is now running") - done = true + done = false return - } - done = false - return - }) - })); err != nil { - logger.Error("Failed to poll MaintenanceMode", "error", err) - os.Exit(1) + }) + })); err != nil { + logger.Error("Failed to poll MaintenanceMode", "error", err) + os.Exit(1) + } + + if err = (&BlueSecretReconciler{ + Scheme: mgr.GetScheme(), + HubClient: hubClient, + SpokeClient: mgr.GetClient(), + SpokeClusterName: options.SpokeClusterName, + Logger: logger.With("controller", "BlueSecretReconciler"), + }).SetupWithManager(mgr); err != nil { + logger.Error("Failed to create BlueSecret controller", "controller", "BlueSecret", "error", err) + os.Exit(1) + } } if err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { @@ -302,17 +355,6 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo os.Exit(1) } - if err = (&BlueSecretReconciler{ - Scheme: mgr.GetScheme(), - HubClient: hubClient, - SpokeClient: mgr.GetClient(), - SpokeClusterName: options.SpokeClusterName, - Logger: logger.With("controller", "BlueSecretReconciler"), - }).SetupWithManager(mgr); err != nil { - logger.Error("Failed to create BlueSecret controller", "controller", "BlueSecret", "error", err) - os.Exit(1) - } - if err = (&S3SecretReconciler{ Scheme: mgr.GetScheme(), HubClient: hubClient, diff --git a/addons/onboarding_token.go b/addons/onboarding_token.go index 6cbde938..42c51cb0 100644 --- a/addons/onboarding_token.go +++ b/addons/onboarding_token.go @@ -3,10 +3,13 @@ package addons import ( "context" "crypto/tls" + "encoding/base64" + "encoding/json" "fmt" "io" "net/http" "os" + "strings" "time" "github.com/red-hat-storage/odf-multicluster-orchestrator/addons/setup" @@ -22,10 +25,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -func requestStorageClusterPeerToken(ctx context.Context, proxyServiceNamespace string) (string, error) { +type OnboardingSubjectRole string +type OnboardingTicket struct { + ID string `json:"id"` + ExpirationDate int64 `json:"expirationDate,string"` + SubjectRole OnboardingSubjectRole `json:"subjectRole"` + StorageQuotaInGiB *uint `json:"storageQuotaInGiB,omitempty"` + StorageCluster types.UID `json:"storageCluster"` +} + +func requestStorageClusterPeerToken(ctx context.Context, proxyServiceNamespace string) ([]byte, error) { token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") if err != nil { - return "", fmt.Errorf("failed to read token: %w", err) + return nil, fmt.Errorf("failed to read token: %w", err) } url := fmt.Sprintf("https://ux-backend-proxy.%s.svc.cluster.local:8888/onboarding/peer-tokens", proxyServiceNamespace) client := &http.Client{ @@ -37,27 +49,27 @@ func requestStorageClusterPeerToken(ctx context.Context, proxyServiceNamespace s req, err := http.NewRequestWithContext(ctx, "POST", url, nil) if err != nil { - return "", fmt.Errorf("failed to create http request: %w", err) + return nil, fmt.Errorf("failed to create http request: %w", err) } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", string(token))) resp, err := client.Do(req) if err != nil { - return "", fmt.Errorf("http request failed: %w", err) + return nil, fmt.Errorf("http request failed: %w", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - return "", fmt.Errorf("failed to read http response body: %w", err) + return nil, fmt.Errorf("failed to read http response body: %w", err) } if resp.StatusCode != http.StatusOK { - return "", fmt.Errorf("unexpected status code: %s", http.StatusText(resp.StatusCode)) + return nil, fmt.Errorf("unexpected status code: %s", http.StatusText(resp.StatusCode)) } - return string(body), nil + return body, nil } func createStorageClusterPeerTokenSecret(ctx context.Context, client client.Client, scheme *runtime.Scheme, spokeClusterName string, odfOperatorNamespace string, mirrorPeer multiclusterv1alpha1.MirrorPeer, storageClusterRef *v1alpha1.StorageClusterRef) error { @@ -81,14 +93,14 @@ func createStorageClusterPeerTokenSecret(ctx context.Context, client client.Clie Namespace: spokeClusterName, Labels: map[string]string{ utils.CreatedByLabelKey: setup.TokenExchangeName, - utils.SecretLabelTypeKey: string(utils.InternalLabel), + utils.SecretLabelTypeKey: string(utils.ProviderLabel), utils.HubRecoveryLabel: "", }, }, Data: map[string][]byte{ utils.NamespaceKey: []byte(storageClusterRef.Namespace), utils.StorageClusterNameKey: []byte(storageClusterRef.Name), - utils.SecretDataKey: []byte(token), + utils.SecretDataKey: token, }, } @@ -114,3 +126,21 @@ func deleteStorageClusterPeerTokenSecret(ctx context.Context, client client.Clie } return nil } + +func UnmarshalOnboardingToken(token *corev1.Secret) (*OnboardingTicket, error) { + + ticketArr := strings.Split(string(token.Data[utils.SecretDataKey]), ".") + + message, err := base64.StdEncoding.DecodeString(ticketArr[0]) + if err != nil { + return nil, err + } + + var ticketData OnboardingTicket + err = json.Unmarshal(message, &ticketData) + if err != nil { + return nil, err + } + + return &ticketData, nil +} diff --git a/addons/rook_secret_handler.go b/addons/rook_secret_handler.go index b30d672a..b50ef29a 100644 --- a/addons/rook_secret_handler.go +++ b/addons/rook_secret_handler.go @@ -77,7 +77,7 @@ func (r *BlueSecretReconciler) syncBlueSecretForRook(ctx context.Context, secret } labelType = utils.SourceLabel - blueSecret, err = generateBlueSecret(secret, labelType, utils.CreateUniqueSecretName(r.SpokeClusterName, secret.Namespace, storageClusterName), storageClusterName, r.SpokeClusterName, customData) + blueSecret, err = generateBlueSecret(secret, labelType, utils.CreateUniqueSecretName(r.SpokeClusterName, secret.Namespace, storageClusterName), storageClusterName, r.SpokeClusterName, customData, nil) if err != nil { return fmt.Errorf("failed to create secret from the managed cluster secret %q from namespace %v for the hub cluster in namespace %q err: %v", secret.Name, secret.Namespace, r.SpokeClusterName, err) } diff --git a/addons/s3_controller.go b/addons/s3_controller.go index e833ced8..e6a4bf4d 100644 --- a/addons/s3_controller.go +++ b/addons/s3_controller.go @@ -86,9 +86,22 @@ func (r *S3SecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{RequeueAfter: 15 * time.Second}, nil } - err = r.syncBlueSecretForS3(ctx, obc.Name, obc.Namespace) + if _, ok := obc.Annotations[utils.MirrorPeerNameAnnotationKey]; !ok { + logger.Error("Failed to find MirrorPeer name on OBC") + return ctrl.Result{}, err + } + + if _, ok := obc.Annotations[OBCTypeAnnotationKey]; !ok { + logger.Error("Failed to find OBC type on OBC") + return ctrl.Result{}, err + } + + mirrorPeerName := obc.Annotations[utils.MirrorPeerNameAnnotationKey] + obcType := obc.Annotations[OBCTypeAnnotationKey] + + err = r.syncBlueSecretForS3(ctx, obc.Name, obc.Namespace, mirrorPeerName, obcType) if err != nil { - logger.Error("Failed to sync Blue Secret for S3", "OBC", "error", err) + logger.Error("Failed to sync Blue Secret for S3", "error", err) return ctrl.Result{}, err } diff --git a/addons/s3_controller_test.go b/addons/s3_controller_test.go index 55d11434..57ae8495 100644 --- a/addons/s3_controller_test.go +++ b/addons/s3_controller_test.go @@ -67,6 +67,10 @@ var ( ObjectMeta: metav1.ObjectMeta{ Name: s3SecretName, Namespace: s3SecretNamespace, + Annotations: map[string]string{ + OBCTypeAnnotationKey: "cluster", + utils.MirrorPeerNameAnnotationKey: "test-mirrorpeer", + }, }, Status: obv1alpha1.ObjectBucketClaimStatus{ Phase: obv1alpha1.ObjectBucketClaimStatusPhaseBound, diff --git a/addons/s3_secret_handler.go b/addons/s3_secret_handler.go index f72f469c..f2e8cd96 100644 --- a/addons/s3_secret_handler.go +++ b/addons/s3_secret_handler.go @@ -7,12 +7,12 @@ import ( routev1 "github.com/openshift/api/route/v1" "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1" "github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils" + "sigs.k8s.io/controller-runtime/pkg/client" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -25,7 +25,7 @@ const ( DefaultS3Region = "noobaa" ) -func (r *S3SecretReconciler) syncBlueSecretForS3(ctx context.Context, name string, namespace string) error { +func (r *S3SecretReconciler) syncBlueSecretForS3(ctx context.Context, name string, namespace string, mirrorPeerName string, obcType string) error { // fetch obc secret var secret corev1.Secret err := r.SpokeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &secret) @@ -40,29 +40,41 @@ func (r *S3SecretReconciler) syncBlueSecretForS3(ctx context.Context, name strin return fmt.Errorf("failed to retrieve the config map %q in namespace %q in managed cluster: %v", name, namespace, err) } - mirrorPeers, err := utils.FetchAllMirrorPeers(context.TODO(), r.HubClient) + mirrorPeer, err := utils.FetchMirrorPeerByName(ctx, r.HubClient, mirrorPeerName) if err != nil { - r.Logger.Error("Failed to fetch all mirror peers") + r.Logger.Error("Failed to fetch mirrorpeer", "MirrorPeer", mirrorPeerName) return err } var storageClusterRef *v1alpha1.StorageClusterRef - for _, mirrorPeer := range mirrorPeers { - storageClusterRef, err = utils.GetCurrentStorageClusterRef(&mirrorPeer, r.SpokeClusterName) - if err == nil { - break + var s3ProfileName string + + if obcType == string(CLUSTER) { + storageClusterRef, err = utils.GetCurrentStorageClusterRef(mirrorPeer, r.SpokeClusterName) + s3ProfileName = fmt.Sprintf("%s-%s-%s", utils.S3ProfilePrefix, r.SpokeClusterName, storageClusterRef.Name) + } else { + sc, err := utils.GetStorageClusterFromCurrentNamespace(ctx, r.SpokeClient, namespace) + if err != nil { + return fmt.Errorf("failed to find StorageCluster for given provider cluster %s", r.SpokeClusterName) } + + r.Logger.Info("Found StorageCluster for provider", "Provider", r.SpokeClusterName, "StorageClusterName", sc.Name, "StorageCluster Namespace", sc.Namespace) + storageClusterRef = &v1alpha1.StorageClusterRef{ + Name: sc.Name, + Namespace: sc.Namespace, + } + s3ProfileName = fmt.Sprintf("%s-%s", utils.S3ProfilePrefix, utils.CreateUniqueName(mirrorPeerName, r.SpokeClusterName, storageClusterRef.Name)[0:39]) } - if storageClusterRef == nil { + if storageClusterRef == nil || err != nil { return fmt.Errorf("failed to find storage cluster ref using spoke cluster name %s from mirrorpeers: %v", r.SpokeClusterName, err) } // fetch s3 endpoint route := &routev1.Route{} - err = r.SpokeClient.Get(ctx, types.NamespacedName{Name: S3RouteName, Namespace: storageClusterRef.Namespace}, route) + err = r.SpokeClient.Get(ctx, types.NamespacedName{Name: S3RouteName, Namespace: namespace}, route) if err != nil { - return fmt.Errorf("failed to retrieve the S3 endpoint in namespace %q in managed cluster: %v", storageClusterRef.Namespace, err) + return fmt.Errorf("failed to retrieve the S3 endpoint in namespace %q in managed cluster: %v", namespace, err) } s3Region := configMap.Data[S3BucketRegion] @@ -74,11 +86,11 @@ func (r *S3SecretReconciler) syncBlueSecretForS3(ctx context.Context, name strin s3Secret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: storageClusterRef.Namespace, + Namespace: namespace, }, Type: utils.SecretLabelTypeKey, Data: map[string][]byte{ - utils.S3ProfileName: []byte(fmt.Sprintf("%s-%s-%s", utils.S3ProfilePrefix, r.SpokeClusterName, storageClusterRef.Name)), + utils.S3ProfileName: []byte(s3ProfileName), utils.S3BucketName: []byte(configMap.Data[S3BucketName]), utils.S3Region: []byte(s3Region), utils.S3Endpoint: []byte(fmt.Sprintf("%s://%s", DefaultS3EndpointProtocol, route.Spec.Host)), @@ -91,19 +103,39 @@ func (r *S3SecretReconciler) syncBlueSecretForS3(ctx context.Context, name strin utils.SecretOriginKey: []byte(utils.OriginMap["S3Origin"]), } - newSecret, err := generateBlueSecret(s3Secret, utils.InternalLabel, utils.CreateUniqueSecretName(r.SpokeClusterName, storageClusterRef.Namespace, storageClusterRef.Name, utils.S3ProfilePrefix), storageClusterRef.Name, r.SpokeClusterName, customData) + var secretName string + if obcType == string(CLUSTER) { + secretName = utils.CreateUniqueSecretName(r.SpokeClusterName, storageClusterRef.Namespace, storageClusterRef.Name, utils.S3ProfilePrefix) + } else { + pr1 := mirrorPeer.Spec.Items[0] + pr2 := mirrorPeer.Spec.Items[1] + secretName = utils.CreateUniqueSecretNameForClient(r.SpokeClusterName, utils.GetKey(pr1.ClusterName, pr1.StorageClusterRef.Name), utils.GetKey(pr2.ClusterName, pr2.StorageClusterRef.Name)) + } + + annotations := map[string]string{ + OBCNameAnnotationKey: name, + utils.MirrorPeerNameAnnotationKey: mirrorPeerName, + OBCTypeAnnotationKey: obcType, + } + + newSecret, err := generateBlueSecret(s3Secret, utils.InternalLabel, secretName, storageClusterRef.Name, r.SpokeClusterName, customData, annotations) if err != nil { return fmt.Errorf("failed to create secret from the managed cluster secret %q in namespace %q for the hub cluster in namespace %q: %v", secret.Name, secret.Namespace, r.SpokeClusterName, err) } - err = r.HubClient.Create(ctx, newSecret, &client.CreateOptions{}) if err != nil { if errors.IsAlreadyExists(err) { - // Log that the secret already exists and is not created again - r.Logger.Info("Secret already exists on hub cluster, not creating again", "secret", newSecret.Name, "namespace", newSecret.Namespace) + // Log that the secret already exists and attempt to update it + r.Logger.Info("Secret already exists on hub cluster, attempting to update", "secret", newSecret.Name, "namespace", newSecret.Namespace) + err = r.HubClient.Update(ctx, newSecret, &client.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update existing secret %q in namespace %q on hub cluster: %w", newSecret.Name, newSecret.Namespace, err) + } + r.Logger.Info("Successfully updated existing secret on hub cluster", "secret", newSecret.Name, "namespace", newSecret.Namespace) return nil } - return fmt.Errorf("failed to sync managed cluster secret %q from namespace %v to the hub cluster in namespace %q: %v", name, namespace, r.SpokeClusterName, err) + // If it's an error other than "already exists", log and return + return fmt.Errorf("failed to create secret %q in namespace %q on hub cluster: %w", newSecret.Name, newSecret.Namespace, err) } r.Logger.Info("Successfully synced managed cluster s3 bucket secret to the hub cluster", "secret", name, "namespace", namespace, "hubNamespace", r.SpokeClusterName) diff --git a/addons/secret_exchange_handler_utils.go b/addons/secret_exchange_handler_utils.go index c6bf1832..f2c1edc2 100644 --- a/addons/secret_exchange_handler_utils.go +++ b/addons/secret_exchange_handler_utils.go @@ -10,14 +10,24 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func generateBlueSecret(secret corev1.Secret, secretType utils.SecretLabelType, uniqueName string, sc string, managedCluster string, customData map[string][]byte) (nsecret *corev1.Secret, err error) { +func generateBlueSecret( + secret corev1.Secret, + secretType utils.SecretLabelType, + uniqueName string, + sc string, + managedCluster string, + customData map[string][]byte, + annotations map[string]string, +) (*corev1.Secret, error) { secretData, err := json.Marshal(secret.Data) if err != nil { - return nsecret, fmt.Errorf("failed to marshal secret data for secret '%s' in namespace '%s': %v", secret.Name, secret.Namespace, err) + return nil, fmt.Errorf( + "failed to marshal secret data for secret '%s' in namespace '%s': %v", + secret.Name, secret.Namespace, err, + ) } data := make(map[string][]byte) - data[utils.NamespaceKey] = []byte(secret.Namespace) data[utils.StorageClusterNameKey] = []byte(sc) data[utils.SecretDataKey] = secretData @@ -26,7 +36,7 @@ func generateBlueSecret(secret corev1.Secret, secretType utils.SecretLabelType, data[key] = value } - nSecret := corev1.Secret{ + nSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: uniqueName, Namespace: managedCluster, @@ -34,11 +44,13 @@ func generateBlueSecret(secret corev1.Secret, secretType utils.SecretLabelType, utils.SecretLabelTypeKey: string(secretType), utils.HubRecoveryLabel: "", }, + Annotations: annotations, }, Type: utils.SecretLabelTypeKey, Data: data, } - return &nSecret, nil + + return nSecret, nil } func generateBlueSecretForExternal(rookCephMon corev1.Secret, labelType utils.SecretLabelType, name string, sc string, managedClusterName string, customData map[string][]byte) (*corev1.Secret, error) { diff --git a/addons/setup/tokenexchange-manifests/spoke_clusterrole.yaml b/addons/setup/tokenexchange-manifests/spoke_clusterrole.yaml index ef37395f..f33e6aff 100644 --- a/addons/setup/tokenexchange-manifests/spoke_clusterrole.yaml +++ b/addons/setup/tokenexchange-manifests/spoke_clusterrole.yaml @@ -17,7 +17,7 @@ rules: verbs: ["get", "list", "watch", "create", "update"] - apiGroups: ["objectbucket.io"] resources: ["objectbucketclaims"] - verbs: ["get", "create", "list", "watch", "delete"] + verbs: ["get", "create", "list", "watch", "delete","update"] - apiGroups: ["multicluster.odf.openshift.io"] resources: ["mirrorpeers"] verbs: ["get", "list", "watch", "update"] @@ -38,4 +38,4 @@ rules: verbs: ["get", "list","watch","update", "patch"] - apiGroups: ["apiextensions.k8s.io"] resources: ["customresourcedefinitions"] - verbs: ["get"] + verbs: ["get","list","watch"] diff --git a/controllers/common_controller_utils.go b/controllers/common_controller_utils.go index 8dacad9e..cccc9013 100644 --- a/controllers/common_controller_utils.go +++ b/controllers/common_controller_utils.go @@ -267,18 +267,6 @@ func createOrUpdateExternalSecret(ctx context.Context, rc client.Client, secret return nil } -func isS3ProfileManagedPeerRef(clusterPeerRef multiclusterv1alpha1.PeerRef, mirrorPeers []multiclusterv1alpha1.MirrorPeer) bool { - for _, mirrorpeer := range mirrorPeers { - for _, peerRef := range mirrorpeer.Spec.Items { - if reflect.DeepEqual(clusterPeerRef, peerRef) && (mirrorpeer.Spec.ManageS3) { - // found mirror peer with ManageS3 spec enabled - return true - } - } - } - return false -} - func updateS3ProfileFields(expected *rmn.S3StoreProfile, found *rmn.S3StoreProfile) { found.S3ProfileName = expected.S3ProfileName found.S3Bucket = expected.S3Bucket @@ -314,13 +302,8 @@ func areS3ProfileFieldsEqual(expected rmn.S3StoreProfile, found rmn.S3StoreProfi func updateRamenHubOperatorConfig(ctx context.Context, rc client.Client, secret *corev1.Secret, data map[string][]byte, mirrorPeers []multiclusterv1alpha1.MirrorPeer, ramenHubNamespace string, logger *slog.Logger) error { logger.Info("Starting to update Ramen Hub Operator config", "SecretName", secret.Name, "Namespace", secret.Namespace) - clusterPeerRef, err := utils.CreatePeerRefFromSecret(secret) - if err != nil { - logger.Error("Failed to create peer reference from secret", "error", err, "SecretName", secret.Name, "Namespace", secret.Namespace) - return err - } - if mirrorPeers == nil { + var err error mirrorPeers, err = utils.FetchAllMirrorPeers(ctx, rc) if err != nil { logger.Error("Failed to fetch all MirrorPeers", "error", err) @@ -328,8 +311,25 @@ func updateRamenHubOperatorConfig(ctx context.Context, rc client.Client, secret } } - if !isS3ProfileManagedPeerRef(clusterPeerRef, mirrorPeers) { - logger.Info("Manage S3 is disabled on MirrorPeer spec, skipping update", "PeerRef", clusterPeerRef) + if _, ok := secret.Annotations[utils.MirrorPeerNameAnnotationKey]; !ok { + return fmt.Errorf("failed to find MirrorPeerName on secret") + } + + mirrorPeerName := secret.Annotations[utils.MirrorPeerNameAnnotationKey] + var foundMirrorPeer *multiclusterv1alpha1.MirrorPeer + for _, mp := range mirrorPeers { + if mp.Name == mirrorPeerName { + foundMirrorPeer = &mp + break + } + } + + if foundMirrorPeer == nil { + return fmt.Errorf("MirrorPeer %q not found", mirrorPeerName) + } + + if !foundMirrorPeer.Spec.ManageS3 { + logger.Info("Manage S3 is disabled on MirrorPeer spec, skipping update", "MirrorPeer", mirrorPeerName) return nil } @@ -348,7 +348,7 @@ func updateRamenHubOperatorConfig(ctx context.Context, rc client.Client, secret Name: utils.RamenHubOperatorConfigName, Namespace: ramenHubNamespace, } - err = rc.Get(ctx, namespacedName, ¤tRamenConfigMap) + err := rc.Get(ctx, namespacedName, ¤tRamenConfigMap) if err != nil { logger.Error("Failed to fetch Ramen Hub Operator config map", "error", err, "ConfigMapName", namespacedName) return err diff --git a/controllers/common_controller_utils_test.go b/controllers/common_controller_utils_test.go index 32baf944..e83025f1 100644 --- a/controllers/common_controller_utils_test.go +++ b/controllers/common_controller_utils_test.go @@ -112,6 +112,9 @@ func fakeS3InternalSecret(t *testing.T, clusterName string) *corev1.Secret { Labels: map[string]string{ utils.SecretLabelTypeKey: string(utils.InternalLabel), }, + Annotations: map[string]string{ + utils.MirrorPeerNameAnnotationKey: "mirrorpeer", + }, }, Type: utils.SecretLabelTypeKey, Data: data, diff --git a/controllers/managedclusterview_controller.go b/controllers/managedclusterview_controller.go index 95961c67..0621bda5 100644 --- a/controllers/managedclusterview_controller.go +++ b/controllers/managedclusterview_controller.go @@ -166,7 +166,7 @@ func createOrUpdateConfigMap(ctx context.Context, c client.Client, managedCluste return fmt.Errorf("failed to marshal client info for key %s: %w", key, err) } - clientInfoMap[fmt.Sprintf("%s/%s", managedCluster.Name, client.Name)] = string(clientInfoJSON) + clientInfoMap[utils.GetKey(managedCluster.Name, client.Name)] = string(clientInfoJSON) } } @@ -187,6 +187,11 @@ func createOrUpdateConfigMap(ctx context.Context, c client.Client, managedCluste } op, err := controllerutil.CreateOrUpdate(ctx, c, configMap, func() error { + + if configMap.Data == nil { + configMap.Data = make(map[string]string) + } + for clientKey, clientInfo := range clientInfoMap { configMap.Data[clientKey] = clientInfo } diff --git a/controllers/managedclusterview_controller_test.go b/controllers/managedclusterview_controller_test.go index 271b64cb..d0b5aadd 100644 --- a/controllers/managedclusterview_controller_test.go +++ b/controllers/managedclusterview_controller_test.go @@ -105,7 +105,7 @@ storageSystemName: "ocs-storagecluster-storagesystem" assert.NotNil(t, cm) expectedData := map[string]string{ - "cluster1-name/client1": `{"clusterId":"cluster1","name":"client1","providerInfo":{"version":"4.Y.Z","deploymentType":"internal","storageSystemName":"ocs-storagecluster-storagesystem","providerManagedClusterName":"cluster1","namespacedName":{"Namespace":"openshift-storage","Name":"ocs-storagecluster"},"storageProviderEndpoint":"","cephClusterFSID":"7a3d6b81-a55d-44fe-84d0-46c67cd395ca"},"clientManagedClusterName":"cluster1-name","clientId":"client1"}`, + "cluster1-name_client1": `{"clusterId":"cluster1","name":"client1","providerInfo":{"version":"4.Y.Z","deploymentType":"internal","storageSystemName":"ocs-storagecluster-storagesystem","providerManagedClusterName":"cluster1","namespacedName":{"Namespace":"openshift-storage","Name":"ocs-storagecluster"},"storageProviderEndpoint":"","cephClusterFSID":"7a3d6b81-a55d-44fe-84d0-46c67cd395ca"},"clientManagedClusterName":"cluster1-name","clientId":"client1"}`, } assert.Equal(t, expectedData, cm.Data) @@ -152,8 +152,8 @@ storageSystemName: "ocs-storagecluster-storagesystem" assert.NotNil(t, cm) expectedData := map[string]string{ - "cluster1-name/client1": `{"clusterId":"cluster1","name":"client1","providerInfo":{"version":"4.Y.Z","deploymentType":"internal","storageSystemName":"ocs-storagecluster-storagesystem","providerManagedClusterName":"cluster1","namespacedName":{"Namespace":"openshift-storage","Name":"ocs-storagecluster"},"storageProviderEndpoint":"","cephClusterFSID":"7a3d6b81-a55d-44fe-84d0-46c67cd395ca"},"clientManagedClusterName":"cluster1-name","clientId":"client1"}`, - "cluster2-name/client2": `{"clusterId":"cluster2","name":"client2","providerInfo":{"version":"4.Y.Z","deploymentType":"internal","storageSystemName":"ocs-storagecluster-storagesystem","providerManagedClusterName":"cluster2","namespacedName":{"Namespace":"openshift-storage","Name":"ocs-storagecluster"},"storageProviderEndpoint":"","cephClusterFSID":"8b3d6b81-b55d-55fe-94d0-56c67cd495ca"},"clientManagedClusterName":"cluster2-name","clientId":"client2"}`, + "cluster1-name_client1": `{"clusterId":"cluster1","name":"client1","providerInfo":{"version":"4.Y.Z","deploymentType":"internal","storageSystemName":"ocs-storagecluster-storagesystem","providerManagedClusterName":"cluster1","namespacedName":{"Namespace":"openshift-storage","Name":"ocs-storagecluster"},"storageProviderEndpoint":"","cephClusterFSID":"7a3d6b81-a55d-44fe-84d0-46c67cd395ca"},"clientManagedClusterName":"cluster1-name","clientId":"client1"}`, + "cluster2-name_client2": `{"clusterId":"cluster2","name":"client2","providerInfo":{"version":"4.Y.Z","deploymentType":"internal","storageSystemName":"ocs-storagecluster-storagesystem","providerManagedClusterName":"cluster2","namespacedName":{"Namespace":"openshift-storage","Name":"ocs-storagecluster"},"storageProviderEndpoint":"","cephClusterFSID":"8b3d6b81-b55d-55fe-94d0-56c67cd495ca"},"clientManagedClusterName":"cluster2-name","clientId":"client2"}`, } assert.Equal(t, expectedData, cm.Data) diff --git a/controllers/mirrorpeer_controller.go b/controllers/mirrorpeer_controller.go index 5fc0eb8e..9981ef1c 100644 --- a/controllers/mirrorpeer_controller.go +++ b/controllers/mirrorpeer_controller.go @@ -195,6 +195,13 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } + // Check if the MirrorPeer contains StorageClient reference + hasStorageClientRef, err := utils.IsStorageClientType(ctx, r.Client, mirrorPeer, false) + if err != nil { + logger.Error("Failed to determine if MirrorPeer contains StorageClient reference", "error", err) + return ctrl.Result{}, err + } + if err := r.processManagedClusterAddon(ctx, mirrorPeer); err != nil { logger.Error("Failed to process managedclusteraddon", "error", err) return ctrl.Result{}, err @@ -210,14 +217,28 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) if mirrorPeer.Spec.ManageS3 { for _, peerRef := range mirrorPeer.Spec.Items { var s3Secret corev1.Secret + var secretName string + + var namespace string + if hasStorageClientRef { + s3SecretName, s3SecretNamespace, err := GetNamespacedNameForClientS3Secret(ctx, r.Client, peerRef, &mirrorPeer) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get namespace for s3 secret %w", err) + } + secretName = s3SecretName + namespace = s3SecretNamespace + } else { + secretName = utils.GetSecretNameByPeerRef(peerRef, utils.S3ProfilePrefix) + namespace = peerRef.ClusterName + } namespacedName := types.NamespacedName{ - Name: utils.GetSecretNameByPeerRef(peerRef, utils.S3ProfilePrefix), - Namespace: peerRef.ClusterName, + Name: secretName, + Namespace: namespace, } err = r.Client.Get(ctx, namespacedName, &s3Secret) if err != nil { if k8serrors.IsNotFound(err) { - logger.Info("S3 secret is not yet synchronised. retrying till it is available. Requeing request...", "Cluster", peerRef.ClusterName) + logger.Info("S3 secret is not yet synchronised. retrying till it is available. Requeing request...", "Secret Name", secretName, "Namespace/Cluster", namespace) return ctrl.Result{Requeue: true}, nil } logger.Error("Error in fetching s3 internal secret", "Cluster", peerRef.ClusterName, "error", err) @@ -236,7 +257,7 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - err = r.createDRClusters(ctx, &mirrorPeer) + err = r.createDRClusters(ctx, &mirrorPeer, hasStorageClientRef) if err != nil { if k8serrors.IsNotFound(err) { logger.Info("Secret not synchronised yet, retrying to create DRCluster", "MirrorPeer", mirrorPeer.Name) @@ -246,13 +267,6 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - // Check if the MirrorPeer contains StorageClient reference - hasStorageClientRef, err := utils.IsStorageClientType(ctx, r.Client, mirrorPeer, false) - if err != nil { - logger.Error("Failed to determine if MirrorPeer contains StorageClient reference", "error", err) - return ctrl.Result{}, err - } - if hasStorageClientRef { result, err := createStorageClusterPeer(ctx, r.Client, logger, mirrorPeer) if err != nil { @@ -266,57 +280,74 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return result, err } } - return r.updateMirrorPeerStatus(ctx, mirrorPeer) + return r.updateMirrorPeerStatus(ctx, mirrorPeer, hasStorageClientRef) } func createManifestWorkForClusterPairingConfigMap(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer multiclusterv1alpha1.MirrorPeer) (ctrl.Result, error) { + logger.Info("Starting to create ManifestWork for cluster pairing ConfigMap") + clientInfoMap, err := fetchClientInfoConfigMap(ctx, client) if err != nil { if k8serrors.IsNotFound(err) { - logger.Info("Client info config map not found. Retrying request another time...") + logger.Info("Client info ConfigMap not found; requeuing for later retry") return ctrl.Result{Requeue: true}, nil } return ctrl.Result{}, err } + logger.Info("Fetched client info ConfigMap successfully") items := mirrorPeer.Spec.Items - ci1, err := getClientInfoFromConfigMap(clientInfoMap.Data, getKey(items[0].ClusterName, items[0].StorageClusterRef.Name)) + ci1, err := getClientInfoFromConfigMap(clientInfoMap.Data, utils.GetKey(items[0].ClusterName, items[0].StorageClusterRef.Name)) if err != nil { + logger.Error("Failed to get client info from ConfigMap for the first cluster") return ctrl.Result{}, err } - ci2, err := getClientInfoFromConfigMap(clientInfoMap.Data, getKey(items[1].ClusterName, items[1].StorageClusterRef.Name)) + + logger.Info("Fetched client info for the first cluster", "ClientInfo", ci1) + + ci2, err := getClientInfoFromConfigMap(clientInfoMap.Data, utils.GetKey(items[1].ClusterName, items[1].StorageClusterRef.Name)) if err != nil { + logger.Error("Failed to get client info from ConfigMap for the second cluster") return ctrl.Result{}, err } - if err := updateProviderConfigMap(ctx, client, mirrorPeer, ci1, ci2); err != nil { + logger.Info("Fetched client info for the second cluster", "ClientInfo", ci2) + logger.Info("Updating provider ConfigMap with client pairing", "ProviderClient1", ci1.ClientID, "PairedClient1", ci2.ClientID) + if err := updateProviderConfigMap(logger, ctx, client, mirrorPeer, ci1, ci2); err != nil { return ctrl.Result{}, err } - if err := updateProviderConfigMap(ctx, client, mirrorPeer, ci2, ci1); err != nil { + logger.Info("Updating provider ConfigMap with client pairing", "ProviderClient2", ci2.ClientID, "PairedClient2", ci1.ClientID) + if err := updateProviderConfigMap(logger, ctx, client, mirrorPeer, ci2, ci1); err != nil { return ctrl.Result{}, err } + logger.Info("Successfully created ManifestWork for cluster pairing ConfigMap") return ctrl.Result{}, nil } // updateProviderConfigMap updates the ConfigMap on the provider with the new client pairing -func updateProviderConfigMap(ctx context.Context, client client.Client, mirrorPeer multiclusterv1alpha1.MirrorPeer, providerClientInfo ClientInfo, pairedClientInfo ClientInfo) error { +func updateProviderConfigMap(logger *slog.Logger, ctx context.Context, client client.Client, mirrorPeer multiclusterv1alpha1.MirrorPeer, providerClientInfo ClientInfo, pairedClientInfo ClientInfo) error { providerName := providerClientInfo.ProviderInfo.ProviderManagedClusterName - manifestWorkName := "dr-client-pair" + manifestWorkName := "storage-client-mapping" manifestWorkNamespace := providerName - // Attempt to get the existing ManifestWork + logger.Info("Fetching existing ManifestWork for provider", "Namespace", manifestWorkNamespace) manifestWork, err := utils.GetManifestWork(ctx, client, manifestWorkName, manifestWorkNamespace) var configMap *corev1.ConfigMap if err != nil { if k8serrors.IsNotFound(err) { - // ManifestWork does not exist; create a new ConfigMap + logger.Info("ManifestWork not found; creating a new ConfigMap") configMap = &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: corev1.SchemeGroupVersion.String(), + }, ObjectMeta: metav1.ObjectMeta{ - Name: "dr-client-pair-config", + Name: "storage-client-mapping", + Namespace: providerClientInfo.ProviderInfo.NamespacedName.Namespace, }, Data: make(map[string]string), } @@ -324,7 +355,7 @@ func updateProviderConfigMap(ctx context.Context, client client.Client, mirrorPe return fmt.Errorf("failed to get ManifestWork: %w", err) } } else { - // Decode existing ConfigMap + logger.Info("Found existing ManifestWork, decoding ConfigMap") if len(manifestWork.Spec.Workload.Manifests) == 0 { return fmt.Errorf("ManifestWork %s has no manifests", manifestWorkName) } @@ -335,6 +366,7 @@ func updateProviderConfigMap(ctx context.Context, client client.Client, mirrorPe } } + logger.Info("Updating ConfigMap with paired client info", "ProviderClientID", providerClientInfo.ClientID, "PairedClientID", pairedClientInfo.ClientID) configMap.Data[providerClientInfo.ClientID] = pairedClientInfo.ClientID updatedObjJson, err := json.Marshal(configMap) @@ -349,18 +381,16 @@ func updateProviderConfigMap(ctx context.Context, client client.Client, mirrorPe UID: mirrorPeer.UID, } + logger.Info("Creating or updating ManifestWork with updated ConfigMap") _, err = utils.CreateOrUpdateManifestWork(ctx, client, manifestWorkName, manifestWorkNamespace, updatedObjJson, ownerRef) if err != nil { return fmt.Errorf("failed to update ManifestWork for provider %s: %w", providerName, err) } + logger.Info("Successfully updated ManifestWork for provider", "ProviderName", providerName) return nil } -func getKey(clusterName, clientName string) string { - return fmt.Sprintf("%s/%s", clusterName, clientName) -} - func createStorageClusterPeer(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer multiclusterv1alpha1.MirrorPeer) (ctrl.Result, error) { logger = logger.With("MirrorPeer", mirrorPeer.Name) clientInfoMap, err := fetchClientInfoConfigMap(ctx, client) @@ -375,10 +405,12 @@ func createStorageClusterPeer(ctx context.Context, client client.Client, logger clientInfo := make([]ClientInfo, 0) for _, item := range items { - ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, getKey(item.ClusterName, item.StorageClusterRef.Name)) + logger.Info("Fetching info for client", "ClientKey", utils.GetKey(item.ClusterName, item.StorageClusterRef.Name)) + ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, utils.GetKey(item.ClusterName, item.StorageClusterRef.Name)) if err != nil { return ctrl.Result{}, err } + logger.Info("Client Info found", "ClientInfo", ci) clientInfo = append(clientInfo, ci) } @@ -388,7 +420,7 @@ func createStorageClusterPeer(ctx context.Context, client client.Client, logger currentClient := clientInfo[i] // Provider A StorageClusterPeer contains info of Provider B endpoint and ticket, hence this if i == 0 { - oppositeClient := clientInfo[1] + oppositeClient = clientInfo[1] storageClusterPeerName = getStorageClusterPeerName(oppositeClient.ProviderInfo.ProviderManagedClusterName) } else { oppositeClient = clientInfo[0] @@ -396,11 +428,16 @@ func createStorageClusterPeer(ctx context.Context, client client.Client, logger } // Provider B's onboarding token will be used for Provider A's StorageClusterPeer + logger.Info("Fetching onboarding ticket in with name and namespace", "Name", mirrorPeer.GetUID(), "Namespace", oppositeClient.ProviderInfo.ProviderManagedClusterName) onboardingToken, err := fetchOnboardingTicket(ctx, client, oppositeClient, mirrorPeer) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to fetch onboarding token for provider %s. %w", oppositeClient.ProviderInfo.ProviderManagedClusterName, err) } storageClusterPeer := ocsv1.StorageClusterPeer{ + TypeMeta: metav1.TypeMeta{ + Kind: "StorageClusterPeer", + APIVersion: ocsv1.GroupVersion.String(), + }, ObjectMeta: metav1.ObjectMeta{ Name: storageClusterPeerName, // This provider A namespace on which the Storage object exists @@ -453,7 +490,7 @@ func fetchOnboardingTicket(ctx context.Context, client client.Client, clientInfo return "", fmt.Errorf("failed to fetch secret %s in namespace %s", secretName, secretNamespace) } - tokenData, exists := tokenSecret.Data["storagecluster-peer-token"] + tokenData, exists := tokenSecret.Data[utils.SecretDataKey] if !exists { return "", fmt.Errorf("token data not found in secret %s", secretName) } @@ -519,7 +556,7 @@ func getConfig(ctx context.Context, c client.Client, mp multiclusterv1alpha1.Mir } for _, item := range mp.Spec.Items { clientName := item.StorageClusterRef.Name - clientInfo, err := getClientInfoFromConfigMap(clientInfoMap.Data, getKey(item.ClusterName, clientName)) + clientInfo, err := getClientInfoFromConfigMap(clientInfoMap.Data, utils.GetKey(item.ClusterName, clientName)) if err != nil { return []ManagedClusterAddonConfig{}, err } @@ -578,6 +615,7 @@ func (r *MirrorPeerReconciler) processManagedClusterAddon(ctx context.Context, m } } + logger.Info("Installing agents on the namespace", "InstallNamespace", config.InstallNamespace) _, err = controllerutil.CreateOrUpdate(ctx, r.Client, &managedClusterAddOn, func() error { managedClusterAddOn.Spec.InstallNamespace = config.InstallNamespace if err := controllerutil.SetOwnerReference(&mirrorPeer, &managedClusterAddOn, r.Scheme); err != nil { @@ -820,14 +858,45 @@ func checkK8sUpdateErrors(err error, obj client.Object, logger *slog.Logger) (ct return ctrl.Result{}, nil } -func (r *MirrorPeerReconciler) createDRClusters(ctx context.Context, mp *multiclusterv1alpha1.MirrorPeer) error { +func GetNamespacedNameForClientS3Secret(ctx context.Context, client client.Client, pr multiclusterv1alpha1.PeerRef, mp *multiclusterv1alpha1.MirrorPeer) (string, string, error) { + clientInfoMap, err := fetchClientInfoConfigMap(ctx, client) + if err != nil { + if k8serrors.IsNotFound(err) { + return "", "", fmt.Errorf("client info ConfigMap not found; requeuing for later retry %w", err) + } + return "", "", err + } + ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, utils.GetKey(pr.ClusterName, pr.StorageClusterRef.Name)) + if err != nil { + return "", "", err + } + providerManagedClusterName := ci.ProviderInfo.ProviderManagedClusterName + pr1 := mp.Spec.Items[0] + pr2 := mp.Spec.Items[1] + s3SecretName := utils.CreateUniqueSecretNameForClient(providerManagedClusterName, utils.GetKey(pr1.ClusterName, pr1.StorageClusterRef.Name), utils.GetKey(pr2.ClusterName, pr2.StorageClusterRef.Name)) + s3SecretNamespace := providerManagedClusterName + + return s3SecretName, s3SecretNamespace, nil +} +func (r *MirrorPeerReconciler) createDRClusters(ctx context.Context, mp *multiclusterv1alpha1.MirrorPeer, hasStorageClientRef bool) error { logger := r.Logger currentNamespace := os.Getenv("POD_NAMESPACE") for _, pr := range mp.Spec.Items { clusterName := pr.ClusterName - s3SecretName := utils.GetSecretNameByPeerRef(pr, utils.S3ProfilePrefix) - + var s3SecretName string + var s3SecretNamespace string + if hasStorageClientRef { + name, namespace, err := GetNamespacedNameForClientS3Secret(ctx, r.Client, pr, mp) + if err != nil { + return fmt.Errorf("failed to get namespace for s3 secret %w", err) + } + s3SecretName = name + s3SecretNamespace = namespace + } else { + s3SecretName = utils.GetSecretNameByPeerRef(pr, utils.S3ProfilePrefix) + s3SecretNamespace = pr.ClusterName + } dc := ramenv1alpha1.DRCluster{ ObjectMeta: metav1.ObjectMeta{Name: clusterName}, } @@ -849,6 +918,21 @@ func (r *MirrorPeerReconciler) createDRClusters(ctx context.Context, mp *multicl return err } fsid = rt.FSID + } else if mp.Spec.Type == multiclusterv1alpha1.Async && hasStorageClientRef { + logger.Info("Fetching client info and then FSID for creating DRClusters", "Client", utils.GetKey(pr.ClusterName, pr.StorageClusterRef.Name)) + clientInfoMap, err := fetchClientInfoConfigMap(ctx, r.Client) + if err != nil { + if k8serrors.IsNotFound(err) { + return fmt.Errorf("client info config map not found. Retrying request another time") + } + return err + } + ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, utils.GetKey(pr.ClusterName, pr.StorageClusterRef.Name)) + if err != nil { + return err + } + logger.Info("Found FSID for client", "Client", utils.GetKey(pr.ClusterName, pr.StorageClusterRef.Name), "FSID", ci.ProviderInfo.CephClusterFSID) + fsid = ci.ProviderInfo.CephClusterFSID } else { logger.Info("Fetching rook secret ", "Secret Name:", rookSecretName) hs, err := utils.FetchSecretWithName(ctx, r.Client, types.NamespacedName{Name: rookSecretName, Namespace: clusterName}) @@ -866,9 +950,9 @@ func (r *MirrorPeerReconciler) createDRClusters(ctx context.Context, mp *multicl dc.Spec.Region = ramenv1alpha1.Region(fsid) logger.Info("Fetching s3 secret ", "Secret Name:", s3SecretName) - ss, err := utils.FetchSecretWithName(ctx, r.Client, types.NamespacedName{Name: s3SecretName, Namespace: clusterName}) + ss, err := utils.FetchSecretWithName(ctx, r.Client, types.NamespacedName{Name: s3SecretName, Namespace: s3SecretNamespace}) if err != nil { - logger.Error("Failed to fetch S3 secret", "error", err, "SecretName", s3SecretName, "Namespace", clusterName) + logger.Error("Failed to fetch S3 secret", "error", err, "SecretName", s3SecretName, "Namespace", s3SecretNamespace) return err } @@ -956,27 +1040,37 @@ func (r *MirrorPeerReconciler) createClusterRoleBindingsForSpoke(ctx context.Con return nil } -func (r *MirrorPeerReconciler) updateMirrorPeerStatus(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer) (ctrl.Result, error) { +func (r *MirrorPeerReconciler) updateMirrorPeerStatus(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, hasStorageClientRef bool) (ctrl.Result, error) { logger := r.Logger if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async { - tokensExchanged, err := r.checkTokenExchangeStatus(ctx, mirrorPeer) - if err != nil { - if k8serrors.IsNotFound(err) { - logger.Info("Secrets not found; Attempting to reconcile again", "MirrorPeer", mirrorPeer.Name) - return ctrl.Result{Requeue: true}, nil + isPeeringDone := false + if hasStorageClientRef { + providerModePeeringDone, err := isProviderModePeeringDone(ctx, r.Client, r.Logger, &mirrorPeer) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to check if provider mode peering is correctly done %w", err) } - logger.Error("Error while exchanging tokens", "error", err, "MirrorPeer", mirrorPeer.Name) - mirrorPeer.Status.Message = err.Error() - statusErr := r.Client.Status().Update(ctx, &mirrorPeer) - if statusErr != nil { - logger.Error("Error occurred while updating the status of mirrorpeer", "error", statusErr, "MirrorPeer", mirrorPeer.Name) + isPeeringDone = providerModePeeringDone + } else { + tokensExchanged, err := r.checkTokenExchangeStatus(ctx, mirrorPeer) + if err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("Secrets not found; Attempting to reconcile again", "MirrorPeer", mirrorPeer.Name) + return ctrl.Result{Requeue: true}, nil + } + logger.Error("Error while exchanging tokens", "error", err, "MirrorPeer", mirrorPeer.Name) + mirrorPeer.Status.Message = err.Error() + statusErr := r.Client.Status().Update(ctx, &mirrorPeer) + if statusErr != nil { + logger.Error("Error occurred while updating the status of mirrorpeer", "error", statusErr, "MirrorPeer", mirrorPeer.Name) + } + return ctrl.Result{}, err } - return ctrl.Result{}, err + isPeeringDone = tokensExchanged } - if tokensExchanged { - logger.Info("Tokens exchanged", "MirrorPeer", mirrorPeer.Name) + if isPeeringDone { + logger.Info("Peering of clusters is completed", "MirrorPeer", mirrorPeer.Name) mirrorPeer.Status.Phase = multiclusterv1alpha1.ExchangedSecret mirrorPeer.Status.Message = "" statusErr := r.Client.Status().Update(ctx, &mirrorPeer) @@ -985,10 +1079,18 @@ func (r *MirrorPeerReconciler) updateMirrorPeerStatus(ctx context.Context, mirro return ctrl.Result{Requeue: true}, nil } return ctrl.Result{}, nil + } else { + mirrorPeer.Status.Phase = multiclusterv1alpha1.ExchangingSecret + statusErr := r.Client.Status().Update(ctx, &mirrorPeer) + if statusErr != nil { + logger.Error("Error occurred while updating the status of mirrorpeer", "error", statusErr, "MirrorPeer", mirrorPeer.Name) + return ctrl.Result{Requeue: true}, nil + } + return ctrl.Result{}, nil } } else { // Sync mode status update, same flow as async but for s3 profile - s3ProfileSynced, err := r.checkS3ProfileStatus(ctx, mirrorPeer) + s3ProfileSynced, err := checkS3ProfileStatus(ctx, r.Client, logger, mirrorPeer, hasStorageClientRef) if err != nil { if k8serrors.IsNotFound(err) { logger.Info("S3 secrets not found; Attempting to reconcile again", "MirrorPeer", mirrorPeer.Name) @@ -1018,22 +1120,97 @@ func (r *MirrorPeerReconciler) updateMirrorPeerStatus(ctx context.Context, mirro return ctrl.Result{Requeue: true}, nil } -func (r *MirrorPeerReconciler) checkS3ProfileStatus(ctx context.Context, mp multiclusterv1alpha1.MirrorPeer) (bool, error) { - logger := r.Logger +func isProviderModePeeringDone(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer *multiclusterv1alpha1.MirrorPeer) (bool, error) { + isS3SecretSynced, err := checkS3ProfileStatus(ctx, client, logger, *mirrorPeer, true) + if err != nil { + logger.Error("failed to check if s3 secrets have been synced") + return false, err + } + logger.Info("S3 secrets sync status", "isS3SecretSynced", isS3SecretSynced) + + isStorageClusterPeerManifestWorkCreated, err := checkStorageClusterPeerStatus(ctx, client, logger, mirrorPeer) + if err != nil { + logger.Error("failed to check if StorageClusterPeer have been created") + return false, err + } + + logger.Info("StorageClusterPeer manifest work creation status", "isStorageClusterPeerManifestWorkCreated", isStorageClusterPeerManifestWorkCreated) + isClientPairingConfigMapCreated, err := checkClientPairingConfigMapStatus(ctx, client, logger, mirrorPeer) + if err != nil { + logger.Error("failed to check if client pair config map has been created") + return false, err + } + + logger.Info("Client pairing ConfigMap creation status", "isClientPairingConfigMapCreated", isClientPairingConfigMapCreated) + + isOnboardingTicketCreated, err := checkOnboardingTicketStatus(ctx, client, logger, mirrorPeer) + if err != nil { + logger.Error("failed to check if onboarding tickets has been created") + return false, err + } + + logger.Info("Onboarding ticket creation status", "isOnboardingTicketCreated", isOnboardingTicketCreated) + + allChecksPassed := isS3SecretSynced && + isStorageClusterPeerManifestWorkCreated && + isClientPairingConfigMapCreated && + isOnboardingTicketCreated + + logger.Info("Provider mode peering status", "AllChecksPassed", allChecksPassed) + return allChecksPassed, nil +} + +func checkOnboardingTicketStatus(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer *multiclusterv1alpha1.MirrorPeer) (bool, error) { + logger = logger.With("MirrorPeer", mirrorPeer.Name) + clientInfoMap, err := fetchClientInfoConfigMap(ctx, client) + if err != nil { + if k8serrors.IsNotFound(err) { + return false, fmt.Errorf("client info config map not found") + } + return false, fmt.Errorf("failed to fetch client info config map %w", err) + } + for _, item := range mirrorPeer.Spec.Items { + logger.Info("Fetching info for client", "ClientKey", utils.GetKey(item.ClusterName, item.StorageClusterRef.Name)) + ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, utils.GetKey(item.ClusterName, item.StorageClusterRef.Name)) + if err != nil { + return false, fmt.Errorf("failed to fetch client info from the config map %w", err) + } + logger.Info("Client Info found for checking onboarding ticket status", "ClientInfo", ci) + _, err = fetchOnboardingTicket(ctx, client, ci, *mirrorPeer) + if err != nil { + return false, fmt.Errorf("failed to fetch onboarding token for provider %s. %w", ci.ProviderInfo.ProviderManagedClusterName, err) + } + } + + return true, nil +} + +func checkS3ProfileStatus(ctx context.Context, client client.Client, logger *slog.Logger, mp multiclusterv1alpha1.MirrorPeer, hasStorageClientRef bool) (bool, error) { logger.Info("Checking S3 profile status for each peer reference in the MirrorPeer", "MirrorPeerName", mp.Name) for _, pr := range mp.Spec.Items { - clusterName := pr.ClusterName - s3SecretName := utils.GetSecretNameByPeerRef(pr, utils.S3ProfilePrefix) - logger.Info("Attempting to fetch S3 secret", "SecretName", s3SecretName, "ClusterName", clusterName) + var s3SecretName string + var s3SecretNamespace string + if hasStorageClientRef { + name, namespace, err := GetNamespacedNameForClientS3Secret(ctx, client, pr, &mp) + if err != nil { + return false, err + } + s3SecretName = name + s3SecretNamespace = namespace + } else { + s3SecretNamespace = pr.ClusterName + s3SecretName = utils.GetSecretNameByPeerRef(pr, utils.S3ProfilePrefix) + } + logger.Info("Attempting to fetch S3 secret", "SecretName", s3SecretName, "Namespace", s3SecretNamespace) - _, err := utils.FetchSecretWithName(ctx, r.Client, types.NamespacedName{Name: s3SecretName, Namespace: clusterName}) + _, err := utils.FetchSecretWithName(ctx, client, types.NamespacedName{Name: s3SecretName, Namespace: s3SecretNamespace}) if err != nil { - logger.Error("Failed to fetch S3 secret", "error", err, "SecretName", s3SecretName, "ClusterName", clusterName) + logger.Error("Failed to fetch S3 secret", "error", err, "SecretName", s3SecretName, "Namespace", s3SecretNamespace) return false, err } - logger.Info("Successfully fetched S3 secret", "SecretName", s3SecretName, "ClusterName", clusterName) + logger.Info("Successfully fetched S3 secret", "SecretName", s3SecretName, "Namespace", s3SecretNamespace) } logger.Info("Successfully verified S3 profile status for all peer references", "MirrorPeerName", mp.Name) diff --git a/controllers/utils/cluster.go b/controllers/utils/cluster.go index 14aaad4d..43cac74e 100644 --- a/controllers/utils/cluster.go +++ b/controllers/utils/cluster.go @@ -61,3 +61,27 @@ func FetchAllCephClusters(ctx context.Context, client client.Client) (*rookv1.Ce } return &cephClusters, nil } + +func GetStorageClusterFromCurrentNamespace(ctx context.Context, c client.Client, namespace string) (*ocsv1.StorageCluster, error) { + storageClusterList := &ocsv1.StorageClusterList{} + listOptions := []client.ListOption{ + client.InNamespace(namespace), + } + + // List all StorageClusters in the specified namespace + if err := c.List(ctx, storageClusterList, listOptions...); err != nil { + return nil, fmt.Errorf("failed to list StorageClusters in namespace %s: %w", namespace, err) + } + + // Ensure only one StorageCluster exists + if len(storageClusterList.Items) == 0 { + return nil, fmt.Errorf("no StorageCluster found in namespace %s", namespace) + } + + if len(storageClusterList.Items) > 1 { + return nil, fmt.Errorf("multiple StorageClusters found in namespace %s", namespace) + } + + // Return the single StorageCluster + return &storageClusterList.Items[0], nil +} diff --git a/controllers/utils/hash.go b/controllers/utils/hash.go index a4075502..50a880cc 100644 --- a/controllers/utils/hash.go +++ b/controllers/utils/hash.go @@ -39,6 +39,10 @@ func CreateUniqueSecretName(managedCluster, storageClusterNamespace, storageClus return CreateUniqueName(managedCluster, storageClusterNamespace, storageClusterName)[0:39] } +func CreateUniqueSecretNameForClient(providerKey, clientKey1, clientKey2 string) string { + return CreateUniqueName(providerKey, clientKey1, clientKey2)[0:39] +} + func CreateUniqueReplicationId(clusterFSIDs map[string]string) (string, error) { var fsids []string for _, v := range clusterFSIDs { @@ -56,11 +60,18 @@ func CreateUniqueReplicationId(clusterFSIDs map[string]string) (string, error) { return CreateUniqueName(fsids...)[0:39], nil } -func GenerateUniqueIdForMirrorPeer(mirrorPeer multiclusterv1alpha1.MirrorPeer) string { +func GenerateUniqueIdForMirrorPeer(mirrorPeer multiclusterv1alpha1.MirrorPeer, hasStorageClientRef bool) string { var peerAccumulator []string - for _, peer := range mirrorPeer.Spec.Items { - peerAccumulator = append(peerAccumulator, peer.ClusterName) + if hasStorageClientRef { + for _, peer := range mirrorPeer.Spec.Items { + peerAccumulator = append(peerAccumulator, GetKey(peer.ClusterName, peer.StorageClusterRef.Name)) + } + } else { + for _, peer := range mirrorPeer.Spec.Items { + peerAccumulator = append(peerAccumulator, peer.ClusterName) + } + } sort.Strings(peerAccumulator) @@ -68,3 +79,7 @@ func GenerateUniqueIdForMirrorPeer(mirrorPeer multiclusterv1alpha1.MirrorPeer) s checksum := sha1.Sum([]byte(strings.Join(peerAccumulator, "-"))) return hex.EncodeToString(checksum[:]) } + +func GetKey(clusterName, clientName string) string { + return fmt.Sprintf("%s_%s", clusterName, clientName) +} diff --git a/controllers/utils/peer_ref.go b/controllers/utils/peer_ref.go index 4b9230f7..df46ed06 100644 --- a/controllers/utils/peer_ref.go +++ b/controllers/utils/peer_ref.go @@ -53,25 +53,25 @@ func GetPeerRefForSpokeCluster(mp *multiclusterv1alpha1.MirrorPeer, spokeCluster func getPeerRefType(ctx context.Context, c client.Client, peerRef multiclusterv1alpha1.PeerRef, isManagedCluster bool) (PeerRefType, error) { if isManagedCluster { - cm, err := GetODFInfoConfigMap(ctx, c, peerRef.StorageClusterRef.Namespace) + operatorNamespace := os.Getenv("POD_NAMESPACE") + cm, err := GetODFInfoConfigMap(ctx, c, operatorNamespace) if err != nil { return PeerRefTypeUnknown, fmt.Errorf("failed to get ODF Info ConfigMap for namespace %s: %w", peerRef.StorageClusterRef.Namespace, err) } var odfInfo ocsv1alpha1.OdfInfoData for key, value := range cm.Data { - namespacedName := SplitKeyForNamespacedName(key) - if namespacedName.Name == peerRef.StorageClusterRef.Name { - err := yaml.Unmarshal([]byte(value), &odfInfo) - if err != nil { - return PeerRefTypeUnknown, fmt.Errorf("failed to unmarshal ODF info data for key %s: %w", key, err) - } - for _, client := range odfInfo.Clients { - if client.Name == peerRef.ClusterName { - return PeerRefTypeStorageClient, nil - } + err := yaml.Unmarshal([]byte(value), &odfInfo) + if err != nil { + return PeerRefTypeUnknown, fmt.Errorf("failed to unmarshal ODF info data for key %s: %w", key, err) + } + + for _, client := range odfInfo.Clients { + if client.Name == peerRef.StorageClusterRef.Name { + return PeerRefTypeStorageClient, nil } } + } return PeerRefTypeStorageCluster, nil } else { @@ -84,7 +84,7 @@ func getPeerRefType(ctx context.Context, c client.Client, peerRef multiclusterv1 return PeerRefTypeUnknown, err } - if _, ok := cm.Data[peerRef.ClusterName]; ok { + if _, ok := cm.Data[fmt.Sprintf("%s_%s", peerRef.ClusterName, peerRef.StorageClusterRef.Name)]; ok { return PeerRefTypeStorageClient, nil } return PeerRefTypeStorageCluster, nil diff --git a/controllers/utils/s3.go b/controllers/utils/s3.go index bb9a3391..b1e1e062 100644 --- a/controllers/utils/s3.go +++ b/controllers/utils/s3.go @@ -28,9 +28,10 @@ const ( RamenHubOperatorConfigName = "ramen-hub-operator-config" //handlers - RookSecretHandlerName = "rook" - S3SecretHandlerName = "s3" - DRModeAnnotationKey = "multicluster.openshift.io/mode" + RookSecretHandlerName = "rook" + S3SecretHandlerName = "s3" + DRModeAnnotationKey = "multicluster.openshift.io/mode" + MirrorPeerNameAnnotationKey = "multicluster.odf.openshift.io/mirrorpeer" ) func GetCurrentStorageClusterRef(mp *multiclusterv1alpha1.MirrorPeer, spokeClusterName string) (*multiclusterv1alpha1.StorageClusterRef, error) { @@ -49,21 +50,18 @@ func GetEnv(key, defaultValue string) string { return defaultValue } -func GenerateBucketName(mirrorPeer multiclusterv1alpha1.MirrorPeer, clientName ...string) string { - mirrorPeerId := GenerateUniqueIdForMirrorPeer(mirrorPeer) +func GenerateBucketName(mirrorPeer multiclusterv1alpha1.MirrorPeer, hasStorageClientRef bool) string { + mirrorPeerId := GenerateUniqueIdForMirrorPeer(mirrorPeer, hasStorageClientRef) bucketGenerateName := BucketGenerateName - if len(clientName) > 0 && clientName[0] != "" { - bucketGenerateName = fmt.Sprintf("%s-%s", BucketGenerateName, clientName[0]) - } - return fmt.Sprintf("%s-%s", bucketGenerateName, mirrorPeerId) } -func CreateOrUpdateObjectBucketClaim(ctx context.Context, c client.Client, bucketName, bucketNamespace string) (controllerutil.OperationResult, error) { +func CreateOrUpdateObjectBucketClaim(ctx context.Context, c client.Client, bucketName, bucketNamespace string, annotations map[string]string) (controllerutil.OperationResult, error) { noobaaOBC := &obv1alpha1.ObjectBucketClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: bucketName, - Namespace: bucketNamespace, + Name: bucketName, + Namespace: bucketNamespace, + Annotations: annotations, // Set annotations here }, } @@ -73,6 +71,14 @@ func CreateOrUpdateObjectBucketClaim(ctx context.Context, c client.Client, bucke StorageClassName: fmt.Sprintf("%s.noobaa.io", bucketNamespace), } + if noobaaOBC.Annotations == nil { + noobaaOBC.Annotations = annotations + } else { + for key, value := range annotations { + noobaaOBC.Annotations[key] = value + } + } + return nil }) diff --git a/controllers/utils/secret.go b/controllers/utils/secret.go index 0ad502bb..6ac7e19b 100644 --- a/controllers/utils/secret.go +++ b/controllers/utils/secret.go @@ -5,9 +5,11 @@ import ( "encoding/base64" "encoding/json" "errors" + "fmt" "reflect" multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -21,6 +23,7 @@ const ( DestinationLabel SecretLabelType = "GREEN" InternalLabel SecretLabelType = "INTERNAL" IgnoreLabel SecretLabelType = "IGNORE" + ProviderLabel SecretLabelType = "PROVIDER" SecretLabelTypeKey = "multicluster.odf.openshift.io/secret-type" CreatedByLabelKey = "multicluster.odf.openshift.io/created-by" NamespaceKey = "namespace" @@ -261,6 +264,15 @@ func FetchAllMirrorPeers(ctx context.Context, rc client.Client) ([]multiclusterv return mirrorPeerListObj.Items, nil } +func FetchMirrorPeerByName(ctx context.Context, rc client.Client, name string) (*multiclusterv1alpha1.MirrorPeer, error) { + var mirrorPeer multiclusterv1alpha1.MirrorPeer + err := rc.Get(ctx, types.NamespacedName{Name: name}, &mirrorPeer) + if err != nil { + return nil, fmt.Errorf("failed to fetch MirrorPeer %s: %w", name, err) + } + return &mirrorPeer, nil +} + func FetchSecretWithName(ctx context.Context, rc client.Client, secretName types.NamespacedName) (*corev1.Secret, error) { var secret corev1.Secret err := rc.Get(ctx, secretName, &secret) diff --git a/controllers/validations.go b/controllers/validations.go index 6f045e89..97cd2ed0 100644 --- a/controllers/validations.go +++ b/controllers/validations.go @@ -19,12 +19,17 @@ package controllers import ( "context" "fmt" + "log/slog" "reflect" multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1" + "github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils" "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clusterv1 "open-cluster-management.io/api/cluster/v1" + workv1 "open-cluster-management.io/api/work/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -43,7 +48,7 @@ func uniqueSpecItems(spec multiclusterv1alpha1.MirrorPeerSpec) error { } func emptySpecItems(peerRef multiclusterv1alpha1.PeerRef) error { - if peerRef.ClusterName == "" || peerRef.StorageClusterRef.Name == "" || peerRef.StorageClusterRef.Namespace == "" { + if peerRef.ClusterName == "" || peerRef.StorageClusterRef.Name == "" { return fmt.Errorf("validation: MirrorPeer.Spec.Items fields must not be empty or undefined") } return nil @@ -60,3 +65,139 @@ func isManagedCluster(ctx context.Context, client client.Client, clusterName str } return nil } + +// checkStorageClusterPeerStatus checks if the ManifestWorks for StorageClusterPeer resources +// have been created and reached the Applied status. +func checkStorageClusterPeerStatus(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer *multiclusterv1alpha1.MirrorPeer) (bool, error) { + logger.Info("Checking if StorageClusterPeer ManifestWorks have been created and reached Applied status") + + // Fetch the client info ConfigMap + clientInfoMap, err := fetchClientInfoConfigMap(ctx, client) + if err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("Client info ConfigMap not found; requeuing for later retry") + return false, nil + } + return false, fmt.Errorf("failed to fetch client info ConfigMap: %w", err) + } + + // Collect client information for each cluster in the MirrorPeer + items := mirrorPeer.Spec.Items + clientInfos := make([]ClientInfo, 0, len(items)) + for _, item := range items { + clientKey := utils.GetKey(item.ClusterName, item.StorageClusterRef.Name) + ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, clientKey) + if err != nil { + logger.Error("Failed to get client info from ConfigMap", "ClientKey", clientKey) + return false, err + } + clientInfos = append(clientInfos, ci) + } + + // Check the status of the ManifestWork for each StorageClusterPeer + for _, currentClient := range clientInfos { + // Determine the name and namespace for the ManifestWork + manifestWorkName := fmt.Sprintf("storageclusterpeer-%s", currentClient.ProviderInfo.ProviderManagedClusterName) + manifestWorkNamespace := currentClient.ProviderInfo.ProviderManagedClusterName + + // Fetch the ManifestWork + manifestWork := &workv1.ManifestWork{} + err := client.Get(ctx, types.NamespacedName{Name: manifestWorkName, Namespace: manifestWorkNamespace}, manifestWork) + if err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("ManifestWork for StorageClusterPeer not found; it may not be created yet", "ManifestWorkName", manifestWorkName) + return false, nil + } + return false, fmt.Errorf("failed to get ManifestWork for StorageClusterPeer: %w", err) + } + + // Check if the ManifestWork has been successfully applied + applied := false + for _, condition := range manifestWork.Status.Conditions { + if condition.Type == workv1.WorkApplied && condition.Status == metav1.ConditionTrue { + applied = true + break + } + } + + if !applied { + logger.Info("StorageClusterPeer ManifestWork has not reached Applied status", "ManifestWorkName", manifestWorkName) + return false, nil + } + + logger.Info("StorageClusterPeer ManifestWork has reached Applied status", "ManifestWorkName", manifestWorkName) + } + + // All ManifestWorks have been created and have Applied status + logger.Info("All StorageClusterPeer ManifestWorks have been created and reached Applied status") + return true, nil +} + +// checkClientPairingConfigMapStatus checks if the ManifestWorks for client pairing ConfigMaps +// have been created and reached the Applied status. +func checkClientPairingConfigMapStatus(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer *multiclusterv1alpha1.MirrorPeer) (bool, error) { + logger.Info("Checking if client pairing ConfigMap ManifestWorks have been created and reached Applied status") + + // Fetch the client info ConfigMap + clientInfoMap, err := fetchClientInfoConfigMap(ctx, client) + if err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("Client info ConfigMap not found; requeuing for later retry") + return false, nil + } + return false, fmt.Errorf("failed to fetch client info ConfigMap: %w", err) + } + + // Collect client information for each cluster in the MirrorPeer + items := mirrorPeer.Spec.Items + clientInfos := make([]ClientInfo, 0, len(items)) + for _, item := range items { + clientKey := utils.GetKey(item.ClusterName, item.StorageClusterRef.Name) + ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, clientKey) + if err != nil { + logger.Error("Failed to get client info from ConfigMap", "ClientKey", clientKey) + return false, err + } + clientInfos = append(clientInfos, ci) + } + + // Check the status of the ManifestWork for each provider's client pairing ConfigMap + for _, providerClient := range clientInfos { + manifestWorkName := "storage-client-mapping" + manifestWorkNamespace := providerClient.ProviderInfo.ProviderManagedClusterName + + // Fetch the ManifestWork + manifestWork := &workv1.ManifestWork{} + err := client.Get(ctx, types.NamespacedName{Name: manifestWorkName, Namespace: manifestWorkNamespace}, manifestWork) + if err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("ManifestWork for client pairing ConfigMap not found; it may not be created yet", + "ManifestWorkName", manifestWorkName, "Namespace", manifestWorkNamespace) + return false, nil + } + return false, fmt.Errorf("failed to get ManifestWork for client pairing ConfigMap: %w", err) + } + + // Check if the ManifestWork has been successfully applied + applied := false + for _, condition := range manifestWork.Status.Conditions { + if condition.Type == workv1.WorkApplied && condition.Status == metav1.ConditionTrue { + applied = true + break + } + } + + if !applied { + logger.Info("Client pairing ConfigMap ManifestWork has not reached Applied status", + "ManifestWorkName", manifestWorkName, "Namespace", manifestWorkNamespace) + return false, nil + } + + logger.Info("Client pairing ConfigMap ManifestWork has reached Applied status", + "ManifestWorkName", manifestWorkName, "Namespace", manifestWorkNamespace) + } + + // All ConfigMap ManifestWorks have been created and have Applied status + logger.Info("All client pairing ConfigMap ManifestWorks have been created and reached Applied status") + return true, nil +}