From 4c5c79567b876eaa2c82fc2f2656f3f06844574f Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Tue, 12 Nov 2024 12:34:26 +0530 Subject: [PATCH] controllers: add a new controller to setup mirroring Signed-off-by: Rewant Soni --- config/rbac/role.yaml | 6 + controllers/mirroring/mirroring_controller.go | 529 ++++++++++++++++++ controllers/storagecluster/cephblockpools.go | 4 + .../storageclusterpeer_controller.go | 11 +- .../storagerequest_controller.go | 4 +- controllers/util/k8sutil.go | 17 +- deploy/csv-templates/ocs-operator.csv.yaml.in | 6 + .../ocs-operator.clusterserviceversion.yaml | 6 + go.mod | 2 +- main.go | 8 + metrics/go.mod | 1 + .../v4/controllers/util/k8sutil.go | 17 +- metrics/vendor/golang.org/x/exp/LICENSE | 27 + metrics/vendor/golang.org/x/exp/PATENTS | 22 + metrics/vendor/golang.org/x/exp/maps/maps.go | 94 ++++ metrics/vendor/modules.txt | 3 + 16 files changed, 743 insertions(+), 14 deletions(-) create mode 100644 controllers/mirroring/mirroring_controller.go create mode 100644 metrics/vendor/golang.org/x/exp/LICENSE create mode 100644 metrics/vendor/golang.org/x/exp/PATENTS create mode 100644 metrics/vendor/golang.org/x/exp/maps/maps.go diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 08ee59544a..b2f47aecc2 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -122,6 +122,12 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - configmaps/finalizers + verbs: + - update - apiGroups: - "" resources: diff --git a/controllers/mirroring/mirroring_controller.go b/controllers/mirroring/mirroring_controller.go new file mode 100644 index 0000000000..109a63ea9b --- /dev/null +++ b/controllers/mirroring/mirroring_controller.go @@ -0,0 +1,529 @@ +/* +Copyright 2020 Red Hat OpenShift Container Storage. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mirroring + +import ( + "context" + "fmt" + "slices" + "time" + + ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + ocsv1alpha1 "github.com/red-hat-storage/ocs-operator/api/v4/v1alpha1" + providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client" + controllers "github.com/red-hat-storage/ocs-operator/v4/controllers/storageconsumer" + "github.com/red-hat-storage/ocs-operator/v4/controllers/util" + + "github.com/go-logr/logr" + rookCephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "golang.org/x/exp/maps" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + // internalKey is a special key for storage-client-mapping to establish mirroring between blockPools for internal mode + internalKey = "internal" + mirroringFinalizer = "mirroring.ocs.openshift.io" + clientIDIndexName = "clientID" +) + +// MirroringReconciler reconciles a Mirroring fields for Ceph Object(s) +// nolint:revive +type MirroringReconciler struct { + client.Client + Scheme *runtime.Scheme + + log logr.Logger + ctx context.Context +} + +// SetupWithManager sets up the controller with the Manager. +func (r *MirroringReconciler) SetupWithManager(mgr ctrl.Manager) error { + + ctx := context.Background() + + if err := mgr.GetCache().IndexField( + ctx, + &ocsv1alpha1.StorageConsumer{}, + util.AnnotationIndexName, + util.AnnotationIndexFieldFunc, + ); err != nil { + return fmt.Errorf("failed to set up FieldIndexer on StorageConsumer for annotations: %v", err) + } + + if err := mgr.GetCache().IndexField( + ctx, + &ocsv1alpha1.StorageConsumer{}, + clientIDIndexName, + func(obj client.Object) []string { + if storageConsumer, ok := obj.(*ocsv1alpha1.StorageConsumer); ok { + return []string{storageConsumer.Status.Client.ID} + } + return nil + }, + ); err != nil { + return fmt.Errorf("failed to set up FieldIndexer on StorageConsumer for clientID: %v", err) + } + + // Reconcile the OperatorConfigMap object when the cluster's version object is updated + enqueueConfigMapRequest := handler.EnqueueRequestsFromMapFunc( + func(_ context.Context, obj client.Object) []reconcile.Request { + return []reconcile.Request{{ + NamespacedName: types.NamespacedName{ + Name: util.StorageClientMappingConfigName, + Namespace: obj.GetNamespace(), + }, + }} + }, + ) + + generationChangePredicate := builder.WithPredicates(predicate.GenerationChangedPredicate{}) + + return ctrl.NewControllerManagedBy(mgr). + For( + &corev1.ConfigMap{}, + builder.WithPredicates(util.NamePredicate(util.StorageClientMappingConfigName)), + ). + Owns( + &rookCephv1.CephRBDMirror{}, + generationChangePredicate, + ). + Watches( + &ocsv1.StorageClusterPeer{}, + enqueueConfigMapRequest, + generationChangePredicate, + ). + Watches( + &ocsv1alpha1.StorageConsumer{}, + enqueueConfigMapRequest, + generationChangePredicate, + ). + Watches( + &rookCephv1.CephBlockPool{}, + enqueueConfigMapRequest, + generationChangePredicate, + ). + Watches( + &rookCephv1.CephBlockPoolRadosNamespace{}, + enqueueConfigMapRequest, + generationChangePredicate, + ). + Complete(r) +} + +//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers;storageconsumers,verbs=get;list;watch +//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch +//+kubebuilder:rbac:groups=core,resources=configmaps/finalizers,verbs=update +//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=ceph.rook.io,resources=cephrbdmirrors,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=ceph.rook.io,resources=cephblockpools;cephblockpoolradosnamespaces,verbs=get;list;watch;create;update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *MirroringReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + r.ctx = ctx + r.log = log.FromContext(ctx, "StorageClientMapping", request.NamespacedName) + r.log.Info("Starting reconcile") + + clientMappingConfig := &corev1.ConfigMap{} + clientMappingConfig.Name = request.Name + clientMappingConfig.Namespace = request.Namespace + + if err := r.get(clientMappingConfig); err != nil { + if k8serrors.IsNotFound(err) { + r.log.Info("ConfigMap %s not found. Ignoring since object must be deleted.") + return ctrl.Result{}, nil + } + r.log.Error(err, "Failed to get ConfigMap.") + return ctrl.Result{}, err + } + return r.reconcilePhases(clientMappingConfig) +} + +func (r *MirroringReconciler) reconcilePhases(clientMappingConfig *corev1.ConfigMap) (ctrl.Result, error) { + + shouldMirror := clientMappingConfig.DeletionTimestamp.IsZero() && + clientMappingConfig.Data != nil && + len(clientMappingConfig.Data) > 0 + + if shouldMirror { + if controllerutil.AddFinalizer(clientMappingConfig, mirroringFinalizer) { + r.log.Info("Finalizer not found for ConfigMap. Adding finalizer.") + if err := r.update(clientMappingConfig); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update ConfigMap: %v", err) + } + } + } + + // Fetch the StorageClusterPeer instance + storageClusterPeerList := &ocsv1.StorageClusterPeerList{} + if err := r.list( + storageClusterPeerList, + client.InNamespace(clientMappingConfig.Namespace), + client.Limit(2), + ); err != nil { + r.log.Error(err, "Failed to get StorageClusterPeer.") + return ctrl.Result{}, err + } + if len(storageClusterPeerList.Items) != 1 { + return ctrl.Result{}, fmt.Errorf("expected 1 StorageClusterPeer but got %d", len(storageClusterPeerList.Items)) + } + + storageClusterPeer := &storageClusterPeerList.Items[0] + + if storageClusterPeer.Status.State != ocsv1.StorageClusterPeerStatePeered || + storageClusterPeer.Status.PeerInfo == nil || + storageClusterPeer.Status.PeerInfo.StorageClusterUid == "" { + r.log.Info( + "waiting for StorageClusterPeer to be in Peered state", + "StorageClusterPeer", + storageClusterPeer.Name, + ) + return ctrl.Result{RequeueAfter: 3 * time.Second}, nil + } + + ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, util.OcsClientTimeout) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err) + } + defer ocsClient.Close() + + if err = r.reconcileRbdMirror(clientMappingConfig, shouldMirror); err != nil { + return ctrl.Result{}, err + } + + if res, err := r.reconcileBlockPoolMirroring( + ocsClient, + clientMappingConfig, + storageClusterPeer, + shouldMirror, + ); err != nil || !res.IsZero() { + return res, err + } + + if res, err := r.reconcileRadosNamespaceMirroring( + ocsClient, + clientMappingConfig, + storageClusterPeer, + shouldMirror, + ); err != nil || !res.IsZero() { + return res, err + } + + if !shouldMirror { + if controllerutil.RemoveFinalizer(clientMappingConfig, mirroringFinalizer) { + r.log.Info("removing finalizer from ConfigMap.") + if err := r.update(clientMappingConfig); err != nil { + r.log.Info("Failed to remove finalizer from ConfigMap") + return ctrl.Result{}, fmt.Errorf("failed to remove finalizer from ConfigMap: %v", err) + } + } + } + + return ctrl.Result{}, nil +} + +func (r *MirroringReconciler) reconcileRbdMirror(clientMappingConfig *corev1.ConfigMap, shouldMirror bool) error { + rbdMirror := &rookCephv1.CephRBDMirror{} + rbdMirror.Name = util.CephRBDMirrorName + rbdMirror.Namespace = clientMappingConfig.Namespace + + storageConsumers := &ocsv1alpha1.StorageConsumerList{} + if err := r.list( + storageConsumers, + client.MatchingFields{util.AnnotationIndexName: util.RequestMaintenanceModeAnnotation}, + ); err != nil { + return fmt.Errorf("failed to list StorageConsumer(s): %v", err) + } + + maintenanceModeRequested := len(storageConsumers.Items) >= 1 + + if shouldMirror && !maintenanceModeRequested { + _, err := ctrl.CreateOrUpdate(r.ctx, r.Client, rbdMirror, func() error { + if err := r.own(clientMappingConfig, rbdMirror); err != nil { + return err + } + rbdMirror.Spec.Count = 1 + return nil + }) + if err != nil { + r.log.Error(err, "Failed to create/update the CephRBDMirror", "CephRBDMirror", rbdMirror) + return err + } + } else { + if err := r.delete(rbdMirror); err != nil { + return fmt.Errorf("failed to delete CephRBDMirror: %v", err) + } + } + + return nil +} + +func (r *MirroringReconciler) reconcileBlockPoolMirroring( + ocsClient *providerClient.OCSProviderClient, + clientMappingConfig *corev1.ConfigMap, + storageClusterPeer *ocsv1.StorageClusterPeer, + shouldMirror bool, +) (ctrl.Result, error) { + + cephBlockPoolsList := &rookCephv1.CephBlockPoolList{} + if err := r.list( + cephBlockPoolsList, + client.InNamespace(clientMappingConfig.Namespace), + ); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to list cephBlockPools: %v", err) + } + + blockPoolByName := map[string]*rookCephv1.CephBlockPool{} + for i := range cephBlockPoolsList.Items { + cephBlockPool := &cephBlockPoolsList.Items[i] + if _, ok := cephBlockPool.GetLabels()[util.ForbidMirroringLabel]; ok { + continue + } + blockPoolByName[cephBlockPool.Name] = cephBlockPool + } + + // fetch BlockPoolsInfo + response, err := ocsClient.GetBlockPoolsInfo( + r.ctx, + storageClusterPeer.Status.PeerInfo.StorageClusterUid, + maps.Keys(blockPoolByName), + ) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get CephBlockPool(s) info from Peer: %w", err) + } + + if response.Errors != nil { + for i := range response.Errors { + resp := response.Errors[i] + r.log.Error( + fmt.Errorf("failed to get CephBlockPool(s) info with code %s", resp.Code), + resp.Message, + "CephBlockPool", + resp.BlockPoolName, + ) + } + return ctrl.Result{Requeue: true}, nil + } + + if shouldMirror { + for i := range response.BlockPoolsInfo { + blockPoolName := response.BlockPoolsInfo[i].BlockPoolName + cephBlockPool := blockPoolByName[blockPoolName] + + mirroringToken := response.BlockPoolsInfo[i].MirroringToken + secretName := fmt.Sprintf("rbd-mirroring-token-%s", util.CalculateMD5Hash(blockPoolName)) + if mirroringToken != "" { + mirroringSecret := &corev1.Secret{} + mirroringSecret.Name = secretName + mirroringSecret.Namespace = clientMappingConfig.Namespace + var err error + + _, err = ctrl.CreateOrUpdate(r.ctx, r.Client, mirroringSecret, func() error { + if err = r.own(clientMappingConfig, mirroringSecret); err != nil { + return err + } + mirroringSecret.Data = map[string][]byte{ + "pool": []byte(blockPoolName), + "token": []byte(mirroringToken), + } + return nil + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create/update bootstrap secret: %w", err) + } + } + + // We need to enable mirroring for the blockPool, else the bootstrap secret will not be generated + _, err = controllerutil.CreateOrUpdate(r.ctx, r.Client, cephBlockPool, func() error { + util.AddAnnotation( + cephBlockPool, + util.BlockPoolMirroringTargetIDAnnotation, + response.BlockPoolsInfo[i].BlockPoolID, + ) + + cephBlockPool.Spec.Mirroring.Enabled = true + cephBlockPool.Spec.Mirroring.Mode = "image" + if mirroringToken != "" { + if cephBlockPool.Spec.Mirroring.Peers == nil { + cephBlockPool.Spec.Mirroring.Peers = &rookCephv1.MirroringPeerSpec{SecretNames: []string{}} + } + if !slices.Contains(cephBlockPool.Spec.Mirroring.Peers.SecretNames, secretName) { + cephBlockPool.Spec.Mirroring.Peers.SecretNames = append( + cephBlockPool.Spec.Mirroring.Peers.SecretNames, secretName) + } + } + return nil + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf( + "failed to update CephBlockPool %s for mirroring: %w", + cephBlockPool.Name, + err, + ) + } + } + } else { + for i := range response.BlockPoolsInfo { + blockPoolName := response.BlockPoolsInfo[i].BlockPoolName + cephBlockPool := blockPoolByName[blockPoolName] + _, err := controllerutil.CreateOrUpdate(r.ctx, r.Client, cephBlockPool, func() error { + cephBlockPool.Spec.Mirroring = rookCephv1.MirroringSpec{} + return nil + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf( + "failed to disable mirroring for CephBlockPool %s: %v", + cephBlockPool.Name, + err, + ) + } + } + } + + return ctrl.Result{}, nil +} + +func (r *MirroringReconciler) reconcileRadosNamespaceMirroring( + ocsClient *providerClient.OCSProviderClient, + clientMappingConfig *corev1.ConfigMap, + storageClusterPeer *ocsv1.StorageClusterPeer, + disableMirroring bool, +) (ctrl.Result, error) { + /* + Algorithm: + make a list of peerClientIDs + send GetStorageClientsInfo with this Info + make a map of remoteNamespaceByClientID from response + list all radosNamespace and for each, + find out which consumer does it belong to + find the localClientID and use map to get radosnamespce + if radosnamespace is empty, disable mirroring + else enable mirroring + */ + + peerClientIDs := []string{} + storageConsumerByName := map[string]*ocsv1alpha1.StorageConsumer{} + for localClientID, peerClientID := range clientMappingConfig.Data { + // for internal mode, we need only blockPool mirroring, hence skipping this for the special key "internal" + if localClientID == internalKey { + continue + } + // Check if the storageConsumer with the ClientID exists, this is a fancy get operation as + // there will be only one consumer with the clientID + storageConsumers := &ocsv1alpha1.StorageConsumerList{} + if err := r.list( + storageConsumers, + client.MatchingFields{clientIDIndexName: localClientID}, + client.Limit(2), + ); err != nil { + return ctrl.Result{}, err + } + if len(storageConsumers.Items) != 1 { + return ctrl.Result{}, fmt.Errorf("expected 1 StorageConsumer but got %v", len(storageConsumers.Items)) + } + storageConsumerByName[storageConsumers.Items[0].Name] = &storageConsumers.Items[0] + peerClientIDs = append(peerClientIDs, peerClientID) + } + + response, err := ocsClient.GetStorageClientsInfo( + r.ctx, + storageClusterPeer.Status.PeerInfo.StorageClusterUid, + peerClientIDs, + ) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get StorageClient info from Peer: %w", err) + } + + if response.Errors != nil { + for i := range response.Errors { + resp := response.Errors[i] + r.log.Error( + fmt.Errorf("failed to get StorageClient(s) info with code %s", resp.Code), + resp.Message, + "CephBlockPool", + resp.ClientID, + ) + } + return ctrl.Result{Requeue: true}, nil + } + + remoteNamespaceByClientID := map[string]string{} + for i := range response.ClientsInfo { + clientInfo := response.ClientsInfo[i] + remoteNamespaceByClientID[clientInfo.ClientID] = clientInfo.RadosNamespace + } + + radosNamespaceList := &rookCephv1.CephBlockPoolRadosNamespaceList{} + if err = r.list(radosNamespaceList, client.InNamespace(storageClusterPeer.Namespace)); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to list CephBlockPoolRadosNamespace(s): %w", err) + } + + for i := range radosNamespaceList.Items { + rns := &radosNamespaceList.Items[i] + consumer := storageConsumerByName[rns.GetLabels()[controllers.StorageConsumerNameLabel]] + if consumer == nil || consumer.Status.Client.ID == "" { + continue + } + remoteClientID := clientMappingConfig.Data[consumer.Status.Client.ID] + remoteNamespace := remoteNamespaceByClientID[remoteClientID] + _, err := controllerutil.CreateOrUpdate(r.ctx, r.Client, rns, func() error { + if remoteNamespace == "" || disableMirroring { + rns.Spec.Mirroring = nil + } else { + rns.Spec.Mirroring = &rookCephv1.RadosNamespaceMirroring{ + RemoteNamespace: ptr.To(remoteNamespace), + Mode: "image", + } + } + return nil + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update radosnamespace %s", rns.Name) + } + } + return ctrl.Result{}, nil +} + +func (r *MirroringReconciler) get(obj client.Object) error { + return r.Client.Get(r.ctx, client.ObjectKeyFromObject(obj), obj) +} + +func (r *MirroringReconciler) list(obj client.ObjectList, listOptions ...client.ListOption) error { + return r.Client.List(r.ctx, obj, listOptions...) +} + +func (r *MirroringReconciler) update(obj client.Object, opts ...client.UpdateOption) error { + return r.Client.Update(r.ctx, obj, opts...) +} + +func (r *MirroringReconciler) delete(obj client.Object, opts ...client.DeleteOption) error { + return r.Client.Delete(r.ctx, obj, opts...) +} + +func (r *MirroringReconciler) own(owner *corev1.ConfigMap, obj client.Object) error { + return controllerutil.SetControllerReference(owner, obj, r.Scheme) +} diff --git a/controllers/storagecluster/cephblockpools.go b/controllers/storagecluster/cephblockpools.go index 361ea33238..6cd0ea92ef 100644 --- a/controllers/storagecluster/cephblockpools.go +++ b/controllers/storagecluster/cephblockpools.go @@ -4,6 +4,7 @@ import ( "fmt" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + "github.com/red-hat-storage/ocs-operator/v4/controllers/util" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -149,6 +150,7 @@ func (o *ocsCephBlockPools) reconcileMgrCephBlockPool(r *StorageClusterReconcile cephBlockPool.Spec.PoolSpec.EnableCrushUpdates = true cephBlockPool.Spec.PoolSpec.FailureDomain = getFailureDomain(storageCluster) cephBlockPool.Spec.PoolSpec.Replicated = generateCephReplicatedSpec(storageCluster, "metadata") + util.AddLabel(cephBlockPool, util.ForbidMirroringLabel, "true") return controllerutil.SetControllerReference(storageCluster, cephBlockPool, r.Scheme) }) @@ -197,6 +199,8 @@ func (o *ocsCephBlockPools) reconcileNFSCephBlockPool(r *StorageClusterReconcile cephBlockPool.Spec.PoolSpec.FailureDomain = getFailureDomain(storageCluster) cephBlockPool.Spec.PoolSpec.Replicated = generateCephReplicatedSpec(storageCluster, "data") cephBlockPool.Spec.PoolSpec.EnableRBDStats = true + util.AddLabel(cephBlockPool, util.ForbidMirroringLabel, "true") + return controllerutil.SetControllerReference(storageCluster, cephBlockPool, r.Scheme) }) if err != nil { diff --git a/controllers/storageclusterpeer/storageclusterpeer_controller.go b/controllers/storageclusterpeer/storageclusterpeer_controller.go index 2da99f7d31..b32c2fcdac 100644 --- a/controllers/storageclusterpeer/storageclusterpeer_controller.go +++ b/controllers/storageclusterpeer/storageclusterpeer_controller.go @@ -21,9 +21,7 @@ import ( "encoding/base64" "encoding/json" "fmt" - "google.golang.org/grpc/codes" "strings" - "time" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client" @@ -31,6 +29,7 @@ import ( "github.com/red-hat-storage/ocs-operator/v4/services" "github.com/go-logr/logr" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -56,7 +55,11 @@ type StorageClusterPeerReconciler struct { func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&ocsv1.StorageClusterPeer{}). - Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches( + &ocsv1.StorageCluster{}, + &handler.EnqueueRequestForObject{}, + builder.WithPredicates(predicate.GenerationChangedPredicate{}), + ). Complete(r) } @@ -160,7 +163,7 @@ func (r *StorageClusterPeerReconciler) reconcileStates(storageClusterPeer *ocsv1 storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{StorageClusterUid: string(ticketData.StorageCluster)} } - ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10) + ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, util.OcsClientTimeout) if err != nil { storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateFailed return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err) diff --git a/controllers/storagerequest/storagerequest_controller.go b/controllers/storagerequest/storagerequest_controller.go index c2c057ff3b..3ae21cd626 100644 --- a/controllers/storagerequest/storagerequest_controller.go +++ b/controllers/storagerequest/storagerequest_controller.go @@ -371,9 +371,7 @@ func (r *StorageRequestReconciler) reconcileRadosNamespace() error { // add a blockpool name in the label so UI can watch for the rados namespace // that belongs to the particular blockpool addLabel(r.cephRadosNamespace, blockPoolNameLabel, blockPoolName) - r.cephRadosNamespace.Spec = rookCephv1.CephBlockPoolRadosNamespaceSpec{ - BlockPoolName: blockPoolName, - } + r.cephRadosNamespace.Spec.BlockPoolName = blockPoolName return nil }) diff --git a/controllers/util/k8sutil.go b/controllers/util/k8sutil.go index 1acf5f649d..cc57894761 100644 --- a/controllers/util/k8sutil.go +++ b/controllers/util/k8sutil.go @@ -3,16 +3,18 @@ package util import ( "context" "fmt" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" + "time" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" "github.com/go-logr/logr" configv1 "github.com/openshift/api/config/v1" + "golang.org/x/exp/maps" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -47,15 +49,20 @@ const ( EnableNFSKey = "ROOK_CSI_ENABLE_NFS" DisableCSIDriverKey = "ROOK_CSI_DISABLE_DRIVER" - // This is the name for the OwnerUID FieldIndex - OwnerUIDIndexName = "ownerUID" + // This is the name for the FieldIndex + OwnerUIDIndexName = "ownerUID" + AnnotationIndexName = "annotation" OdfInfoNamespacedNameClaimName = "odfinfo.odf.openshift.io" ExitCodeThatShouldRestartTheProcess = 42 + //ForbidMirroringLabel is used to forbid mirroring for ceph resources such as CephBlockPool + ForbidMirroringLabel = "ocs.openshift.io/forbid-mirroring" BlockPoolMirroringTargetIDAnnotation = "ocs.openshift.io/mirroring-target-id" RequestMaintenanceModeAnnotation = "ocs.openshift.io/request-maintenance-mode" CephRBDMirrorName = "cephrbdmirror" + OcsClientTimeout = 10 * time.Second + StorageClientMappingConfigName = "storage-client-mapping" ) var podNamespace = os.Getenv(PodNamespaceEnvVar) @@ -167,6 +174,10 @@ func OwnersIndexFieldFunc(obj client.Object) []string { return owners } +func AnnotationIndexFieldFunc(obj client.Object) []string { + return maps.Keys(obj.GetAnnotations()) +} + func GenerateNameForNonResilientCephBlockPoolSC(initData *ocsv1.StorageCluster) string { if initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName != "" { return initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName diff --git a/deploy/csv-templates/ocs-operator.csv.yaml.in b/deploy/csv-templates/ocs-operator.csv.yaml.in index ffa68ff072..71dde54fe0 100644 --- a/deploy/csv-templates/ocs-operator.csv.yaml.in +++ b/deploy/csv-templates/ocs-operator.csv.yaml.in @@ -293,6 +293,12 @@ spec: - patch - update - watch + - apiGroups: + - "" + resources: + - configmaps/finalizers + verbs: + - update - apiGroups: - "" resources: diff --git a/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml b/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml index 30a1d7cab1..438e2abda6 100644 --- a/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml +++ b/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml @@ -302,6 +302,12 @@ spec: - patch - update - watch + - apiGroups: + - "" + resources: + - configmaps/finalizers + verbs: + - update - apiGroups: - "" resources: diff --git a/go.mod b/go.mod index ec70120514..de1d1ef4e8 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/rook/rook/pkg/apis v0.0.0-20241119201302-fc456553b3cc github.com/stretchr/testify v1.9.0 go.uber.org/multierr v1.11.0 + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/net v0.31.0 google.golang.org/grpc v1.68.0 gopkg.in/ini.v1 v1.67.0 @@ -122,7 +123,6 @@ require ( go.mongodb.org/mongo-driver v1.16.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.29.0 // indirect - golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/term v0.26.0 // indirect diff --git a/main.go b/main.go index 37ea4d8d9e..19ff7324e6 100644 --- a/main.go +++ b/main.go @@ -58,6 +58,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" metrics "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "github.com/red-hat-storage/ocs-operator/v4/controllers/mirroring" "github.com/red-hat-storage/ocs-operator/v4/controllers/ocsinitialization" "github.com/red-hat-storage/ocs-operator/v4/controllers/platform" "github.com/red-hat-storage/ocs-operator/v4/controllers/storagecluster" @@ -234,6 +235,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "StorageClusterPeer") os.Exit(1) } + if err = (&mirroring.MirroringReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Mirroring") + os.Exit(1) + } // +kubebuilder:scaffold:builder // Create OCSInitialization CR if it's not present diff --git a/metrics/go.mod b/metrics/go.mod index 3890af5560..be8e160183 100644 --- a/metrics/go.mod +++ b/metrics/go.mod @@ -103,6 +103,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.29.0 // indirect + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/term v0.26.0 // indirect diff --git a/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go b/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go index 1acf5f649d..cc57894761 100644 --- a/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go +++ b/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go @@ -3,16 +3,18 @@ package util import ( "context" "fmt" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" + "time" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" "github.com/go-logr/logr" configv1 "github.com/openshift/api/config/v1" + "golang.org/x/exp/maps" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -47,15 +49,20 @@ const ( EnableNFSKey = "ROOK_CSI_ENABLE_NFS" DisableCSIDriverKey = "ROOK_CSI_DISABLE_DRIVER" - // This is the name for the OwnerUID FieldIndex - OwnerUIDIndexName = "ownerUID" + // This is the name for the FieldIndex + OwnerUIDIndexName = "ownerUID" + AnnotationIndexName = "annotation" OdfInfoNamespacedNameClaimName = "odfinfo.odf.openshift.io" ExitCodeThatShouldRestartTheProcess = 42 + //ForbidMirroringLabel is used to forbid mirroring for ceph resources such as CephBlockPool + ForbidMirroringLabel = "ocs.openshift.io/forbid-mirroring" BlockPoolMirroringTargetIDAnnotation = "ocs.openshift.io/mirroring-target-id" RequestMaintenanceModeAnnotation = "ocs.openshift.io/request-maintenance-mode" CephRBDMirrorName = "cephrbdmirror" + OcsClientTimeout = 10 * time.Second + StorageClientMappingConfigName = "storage-client-mapping" ) var podNamespace = os.Getenv(PodNamespaceEnvVar) @@ -167,6 +174,10 @@ func OwnersIndexFieldFunc(obj client.Object) []string { return owners } +func AnnotationIndexFieldFunc(obj client.Object) []string { + return maps.Keys(obj.GetAnnotations()) +} + func GenerateNameForNonResilientCephBlockPoolSC(initData *ocsv1.StorageCluster) string { if initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName != "" { return initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName diff --git a/metrics/vendor/golang.org/x/exp/LICENSE b/metrics/vendor/golang.org/x/exp/LICENSE new file mode 100644 index 0000000000..2a7cf70da6 --- /dev/null +++ b/metrics/vendor/golang.org/x/exp/LICENSE @@ -0,0 +1,27 @@ +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/metrics/vendor/golang.org/x/exp/PATENTS b/metrics/vendor/golang.org/x/exp/PATENTS new file mode 100644 index 0000000000..733099041f --- /dev/null +++ b/metrics/vendor/golang.org/x/exp/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/metrics/vendor/golang.org/x/exp/maps/maps.go b/metrics/vendor/golang.org/x/exp/maps/maps.go new file mode 100644 index 0000000000..ecc0dabb74 --- /dev/null +++ b/metrics/vendor/golang.org/x/exp/maps/maps.go @@ -0,0 +1,94 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package maps defines various functions useful with maps of any type. +package maps + +// Keys returns the keys of the map m. +// The keys will be in an indeterminate order. +func Keys[M ~map[K]V, K comparable, V any](m M) []K { + r := make([]K, 0, len(m)) + for k := range m { + r = append(r, k) + } + return r +} + +// Values returns the values of the map m. +// The values will be in an indeterminate order. +func Values[M ~map[K]V, K comparable, V any](m M) []V { + r := make([]V, 0, len(m)) + for _, v := range m { + r = append(r, v) + } + return r +} + +// Equal reports whether two maps contain the same key/value pairs. +// Values are compared using ==. +func Equal[M1, M2 ~map[K]V, K, V comparable](m1 M1, m2 M2) bool { + if len(m1) != len(m2) { + return false + } + for k, v1 := range m1 { + if v2, ok := m2[k]; !ok || v1 != v2 { + return false + } + } + return true +} + +// EqualFunc is like Equal, but compares values using eq. +// Keys are still compared with ==. +func EqualFunc[M1 ~map[K]V1, M2 ~map[K]V2, K comparable, V1, V2 any](m1 M1, m2 M2, eq func(V1, V2) bool) bool { + if len(m1) != len(m2) { + return false + } + for k, v1 := range m1 { + if v2, ok := m2[k]; !ok || !eq(v1, v2) { + return false + } + } + return true +} + +// Clear removes all entries from m, leaving it empty. +func Clear[M ~map[K]V, K comparable, V any](m M) { + for k := range m { + delete(m, k) + } +} + +// Clone returns a copy of m. This is a shallow clone: +// the new keys and values are set using ordinary assignment. +func Clone[M ~map[K]V, K comparable, V any](m M) M { + // Preserve nil in case it matters. + if m == nil { + return nil + } + r := make(M, len(m)) + for k, v := range m { + r[k] = v + } + return r +} + +// Copy copies all key/value pairs in src adding them to dst. +// When a key in src is already present in dst, +// the value in dst will be overwritten by the value associated +// with the key in src. +func Copy[M1 ~map[K]V, M2 ~map[K]V, K comparable, V any](dst M1, src M2) { + for k, v := range src { + dst[k] = v + } +} + +// DeleteFunc deletes any key/value pairs from m for which del returns true. +func DeleteFunc[M ~map[K]V, K comparable, V any](m M, del func(K, V) bool) { + for k, v := range m { + if del(k, v) { + delete(m, k) + } + } +} diff --git a/metrics/vendor/modules.txt b/metrics/vendor/modules.txt index 89061c296c..00bd04484d 100644 --- a/metrics/vendor/modules.txt +++ b/metrics/vendor/modules.txt @@ -353,6 +353,9 @@ go.uber.org/zap/zapcore # golang.org/x/crypto v0.29.0 ## explicit; go 1.20 golang.org/x/crypto/pbkdf2 +# golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c +## explicit; go 1.22.0 +golang.org/x/exp/maps # golang.org/x/net v0.31.0 ## explicit; go 1.18 golang.org/x/net/context