diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 08ee59544a..b2f47aecc2 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -122,6 +122,12 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - configmaps/finalizers + verbs: + - update - apiGroups: - "" resources: diff --git a/controllers/mirroring/mirroring_controller.go b/controllers/mirroring/mirroring_controller.go new file mode 100644 index 0000000000..0da8829b76 --- /dev/null +++ b/controllers/mirroring/mirroring_controller.go @@ -0,0 +1,477 @@ +/* +Copyright 2020 Red Hat OpenShift Container Storage. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mirroring + +import ( + "context" + "fmt" + "slices" + + ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + ocsv1alpha1 "github.com/red-hat-storage/ocs-operator/api/v4/v1alpha1" + providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client" + "github.com/red-hat-storage/ocs-operator/v4/controllers/storageclusterpeer" + controllers "github.com/red-hat-storage/ocs-operator/v4/controllers/storageconsumer" + "github.com/red-hat-storage/ocs-operator/v4/controllers/util" + + "github.com/go-logr/logr" + rookCephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + // internalKey is a special key for client-mapping-config to establish mirroring between blockPools for internal mode + internalKey = "internal" + mirroringFinalizer = "mirroring.ocs.openshift.io" +) + +// MirroringReconciler reconciles a Mirroring fields for Ceph Object(s) +// nolint:revive +type MirroringReconciler struct { + client.Client + Scheme *runtime.Scheme + + log logr.Logger + ctx context.Context +} + +// SetupWithManager sets up the controller with the Manager. +func (r *MirroringReconciler) SetupWithManager(mgr ctrl.Manager) error { + + ctx := context.Background() + + if err := mgr.GetCache().IndexField( + ctx, + &ocsv1alpha1.StorageConsumer{}, + util.AnnotationIndexName, + util.AnnotationIndexFieldFunc, + ); err != nil { + return fmt.Errorf("unable to set up FieldIndexer on StorageConsumer for annotations: %v", err) + } + + if err := mgr.GetCache().IndexField( + ctx, + &ocsv1alpha1.StorageConsumer{}, + "clientID", + func(obj client.Object) []string { + if storageConsumer, ok := obj.(*ocsv1alpha1.StorageConsumer); ok { + return []string{storageConsumer.Status.Client.ID} + } + return nil + }, + ); err != nil { + return fmt.Errorf("unable to set up FieldIndexer on StorageConsumer for clientID: %v", err) + } + + // Reconcile the OperatorConfigMap object when the cluster's version object is updated + enqueueConfigMapRequest := handler.EnqueueRequestsFromMapFunc( + func(_ context.Context, obj client.Object) []reconcile.Request { + return []reconcile.Request{{ + NamespacedName: types.NamespacedName{ + Name: storageclusterpeer.StorageClientMappingConfigName, + Namespace: obj.GetNamespace(), + }, + }} + }, + ) + + return ctrl.NewControllerManagedBy(mgr). + For( + &corev1.ConfigMap{}, + builder.WithPredicates(util.NamePredicate(storageclusterpeer.StorageClientMappingConfigName)), + ). + Watches(&ocsv1.StorageClusterPeer{}, enqueueConfigMapRequest). + Watches(&ocsv1alpha1.StorageConsumer{}, enqueueConfigMapRequest). + Complete(r) +} + +//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers;storageconsumers,verbs=get;list;watch +//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch +//+kubebuilder:rbac:groups=core,resources=configmaps/finalizers,verbs=update +//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=ceph.rook.io,resources=cephrbdmirrors,verbs=get;list;watch;create;update;delete + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *MirroringReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + var err error + r.ctx = ctx + r.log = log.FromContext(ctx, "ConfigMap", request.NamespacedName) + r.log.Info("Starting reconcile") + + clientMappingConfig := &corev1.ConfigMap{} + clientMappingConfig.Name = request.Name + clientMappingConfig.Namespace = request.Namespace + + if err = r.get(clientMappingConfig); err != nil { + if k8serrors.IsNotFound(err) { + r.log.Info("ConfigMap %s not found. Ignoring since object must be deleted.") + return ctrl.Result{}, nil + } + r.log.Error(err, "Failed to get ConfigMap.") + return ctrl.Result{}, err + } + + if clientMappingConfig.GetDeletionTimestamp().IsZero() { + if len(clientMappingConfig.Data) < 1 { + return r.reconcileBlockPoolMirroring( + nil, + clientMappingConfig, + nil, + false, + ) + } + + if controllerutil.AddFinalizer(clientMappingConfig, mirroringFinalizer) { + r.log.Info("Finalizer not found for ConfigMap. Adding finalizer.") + if err := r.update(clientMappingConfig); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update ConfigMap: %v", err) + } + } + + return r.reconcilePhases(clientMappingConfig) + } + + //deletion phase + if res, err := r.reconcileBlockPoolMirroring( + nil, + clientMappingConfig, + nil, + false, + ); err != nil || !res.IsZero() { + return res, err + } + + if controllerutil.RemoveFinalizer(clientMappingConfig, mirroringFinalizer) { + r.log.Info("removing finalizer from ConfigMap.") + if err := r.update(clientMappingConfig); err != nil { + r.log.Info("Failed to remove finalizer from ConfigMap") + return ctrl.Result{}, fmt.Errorf("failed to remove finalizer from ConfigMap: %v", err) + } + } + return ctrl.Result{}, nil +} + +func (r *MirroringReconciler) reconcilePhases(clientMappingConfig *corev1.ConfigMap) (ctrl.Result, error) { + // Find the StorageClusterPeer from OwnerRef + owner := util.FindOwnerRefByKind(clientMappingConfig, "StorageClusterPeer") + if owner == nil { + return ctrl.Result{}, fmt.Errorf("failed to find StorgeClusterPeer owning the ClientMappingConfig") + } + + // Fetch the StorageClusterPeer instance + storageClusterPeer := &ocsv1.StorageClusterPeer{} + storageClusterPeer.Name = owner.Name + storageClusterPeer.Namespace = clientMappingConfig.Namespace + + if err := r.get(storageClusterPeer); err != nil { + if k8serrors.IsNotFound(err) { + r.log.Info("StorageClusterPeer resource not found. Ignoring since object must be deleted.") + return ctrl.Result{}, nil + } + r.log.Error(err, "Failed to get StorageClusterPeer.") + return ctrl.Result{}, err + } + + if storageClusterPeer.Status.State != ocsv1.StorageClusterPeerStatePeered { + return ctrl.Result{}, fmt.Errorf( + "waiting for StorageClusterPeer %s to be in Peered state", + storageClusterPeer.Name, + ) + } + + ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, util.OcsClientTimeout) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err) + } + defer ocsClient.Close() + + if err = r.reconcileRbdMirror(clientMappingConfig); err != nil { + return ctrl.Result{}, err + } + + if res, err := r.reconcileBlockPoolMirroring( + ocsClient, + clientMappingConfig, + storageClusterPeer, + true, + ); err != nil || !res.IsZero() { + return res, err + } + + if res, err := r.reconcileRadosNamespaceMirroring( + ocsClient, + clientMappingConfig, + storageClusterPeer); err != nil || !res.IsZero() { + return res, err + } + + return ctrl.Result{}, nil +} + +func (r *MirroringReconciler) reconcileRbdMirror(clientMappingConfig *corev1.ConfigMap) error { + rbdMirror := &rookCephv1.CephRBDMirror{} + rbdMirror.Name = util.CephRBDMirrorName + rbdMirror.Namespace = clientMappingConfig.Namespace + + storageConsumers := &ocsv1alpha1.StorageConsumerList{} + if err := r.list( + storageConsumers, + client.MatchingFields{util.AnnotationIndexName: util.RequestMaintenanceModeAnnotation}, + ); err != nil { + return fmt.Errorf("failed to list StorageConsumer(s): %v", err) + } + + if len(storageConsumers.Items) > 1 { + if err := r.delete(rbdMirror); err != nil { + return fmt.Errorf("failed to delete CephRBDMirror: %v", err) + } + } + + _, err := ctrl.CreateOrUpdate(r.ctx, r.Client, rbdMirror, func() error { + if err := r.own(clientMappingConfig, rbdMirror); err != nil { + return err + } + rbdMirror.Spec.Count = 1 + return nil + }) + if err != nil { + r.log.Error(err, "Failed to create/update the CephRBDMirror", "CephRBDMirror", rbdMirror) + return err + } + + return nil +} + +func (r *MirroringReconciler) reconcileBlockPoolMirroring( + ocsClient *providerClient.OCSProviderClient, + clientMappingConfig *corev1.ConfigMap, + storageClusterPeer *ocsv1.StorageClusterPeer, + enableMirroring bool, +) (ctrl.Result, error) { + + selector := labels.NewSelector() + blockPoolLabelSelectorRequirement, err := labels.NewRequirement( + util.ForbidMirroringLabel, + selection.NotEquals, + []string{"true"}, + ) + if err != nil { + return ctrl.Result{}, err + } + selector = selector.Add(*blockPoolLabelSelectorRequirement) + + cephBlockPoolsList := &rookCephv1.CephBlockPoolList{} + if err = r.list( + cephBlockPoolsList, + client.InNamespace(clientMappingConfig.Namespace), + client.MatchingLabelsSelector{Selector: selector}, + ); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to list cephBlockPools: %v", err) + } + + var blockPoolsList []string + nameToCephBlockPool := map[string]*rookCephv1.CephBlockPool{} + + //enable mirroring for blockpools + for i := range cephBlockPoolsList.Items { + blockPoolsList = append(blockPoolsList, cephBlockPoolsList.Items[i].Name) + nameToCephBlockPool[cephBlockPoolsList.Items[i].Name] = &cephBlockPoolsList.Items[i] + + cephBlockPool := cephBlockPoolsList.Items[i] + if enableMirroring { + cephBlockPool.Spec.Mirroring.Enabled = true + cephBlockPool.Spec.Mirroring.Mode = "image" + } else { + cephBlockPool.Spec.Mirroring = rookCephv1.MirroringSpec{} + } + + if err = r.update(&cephBlockPool); err != nil { + return ctrl.Result{}, fmt.Errorf( + "failed to update mirroring for CephBlockPool %v: %w", + cephBlockPool.Name, + err, + ) + } + } + + if !enableMirroring { + return ctrl.Result{}, nil + } + + // fetch BlockPoolsInfo + response, err := ocsClient.GetBlockPoolsInfo(r.ctx, storageClusterPeer.Status.PeerInfo.StorageClusterUid, blockPoolsList) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get CephBlockPool(s) info from Peer: %w", err) + } + + for i := range response.BlockPoolsInfo { + blockPoolName := response.BlockPoolsInfo[i].BlockPoolName + + mirroringToken := response.BlockPoolsInfo[i].MirroringToken + if mirroringToken == "" { + return ctrl.Result{}, fmt.Errorf("failed to fetch mirroring token for the blockPool") + } + + mirroringSecret := &corev1.Secret{} + mirroringSecret.Name = fmt.Sprintf("%s-%s", "peer", blockPoolName) + mirroringSecret.Namespace = clientMappingConfig.Namespace + var err error + + _, err = ctrl.CreateOrUpdate(r.ctx, r.Client, mirroringSecret, func() error { + if err = r.own(clientMappingConfig, mirroringSecret); err != nil { + return err + } + mirroringSecret.Data = map[string][]byte{ + "pool": []byte(blockPoolName), + "token": []byte(mirroringToken), + } + return nil + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create/update bootstrap secret: %w", err) + } + + cephBlockPool := nameToCephBlockPool[blockPoolName] + + util.AddAnnotation( + cephBlockPool, + util.BlockPoolMirroringTargetIDAnnotation, + response.BlockPoolsInfo[i].BlockPoolID, + ) + + if cephBlockPool.Spec.Mirroring.Peers == nil { + cephBlockPool.Spec.Mirroring.Peers = &rookCephv1.MirroringPeerSpec{SecretNames: []string{}} + } + + if !slices.Contains(cephBlockPool.Spec.Mirroring.Peers.SecretNames, mirroringSecret.Name) { + cephBlockPool.Spec.Mirroring.Peers.SecretNames = append( + cephBlockPool.Spec.Mirroring.Peers.SecretNames, mirroringSecret.Name) + } + + err = r.update(cephBlockPool) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update bootstrap secret ref on CephBlockPool %q: %v", cephBlockPool.Name, err) + } + + } + + return ctrl.Result{}, nil +} + +func (r *MirroringReconciler) reconcileRadosNamespaceMirroring( + ocsClient *providerClient.OCSProviderClient, + clientMappingConfig *corev1.ConfigMap, + storageClusterPeer *ocsv1.StorageClusterPeer, +) (ctrl.Result, error) { + /* + Algorithm: + make a list of peerClientIDs + send GetStorageClientsInfo with this Info + make a map of peerClientIDtoRadosNamespace + list all radosNamespace and for each, + find out which consumer does it belong to + find the localClientID and use map to get radosnamespce + if radosnamespace is empty, continue + else enable mirroring + */ + + peerClientIDs := []string{} + nameToStorageConsumer := map[string]*ocsv1alpha1.StorageConsumer{} + for localClientID, peerClientID := range clientMappingConfig.Data { + // for internal mode, we need only blockPool mirroring, hence skipping this for the special key "internal" + if localClientID == internalKey { + continue + } + // Check if the storageConsumer with the ClientID exists + storageConsumers := &ocsv1alpha1.StorageConsumerList{} + if err := r.list(storageConsumers, client.MatchingFields{"clientID": localClientID}); err != nil { + return ctrl.Result{}, err + } + if len(storageConsumers.Items) != 1 { + return ctrl.Result{}, fmt.Errorf("expected 1 StorageConsumer but got %v", len(storageConsumers.Items)) + } + nameToStorageConsumer[storageConsumers.Items[0].Name] = &storageConsumers.Items[0] + peerClientIDs = append(peerClientIDs, peerClientID) + } + + response, err := ocsClient.GetStorageClientsInfo(r.ctx, storageClusterPeer.Status.PeerInfo.StorageClusterUid, peerClientIDs) + if err != nil { + return ctrl.Result{}, err + } + peerClientIDToRadosNamespace := map[string]string{} + for i := range response.ClientsInfo { + clientInfo := response.ClientsInfo[i] + peerClientIDToRadosNamespace[clientInfo.ClientID] = clientInfo.RadosNamespace + } + + radosNamespaceList := &rookCephv1.CephBlockPoolRadosNamespaceList{} + if err = r.list(radosNamespaceList, client.InNamespace(storageClusterPeer.Namespace)); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to list CephBlockPoolRadosNamespace: %w", err) + } + + for i := range radosNamespaceList.Items { + rns := &radosNamespaceList.Items[i] + consumer := nameToStorageConsumer[rns.GetLabels()[controllers.StorageConsumerNameLabel]] + if consumer == nil { + continue + } + clientID := consumer.Status.Client.ID + radosNamespace := peerClientIDToRadosNamespace[clientMappingConfig.Data[clientID]] + if radosNamespace == "" { + continue + } + rns.Spec.Mirroring = &rookCephv1.RadosNamespaceMirroring{ + RemoteNamespace: ptr.To(radosNamespace), + Mode: "image", + } + if err := r.update(rns); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update radosnamespace %s", rns.Name) + } + } + return ctrl.Result{}, nil +} + +func (r *MirroringReconciler) get(obj client.Object) error { + return r.Client.Get(r.ctx, client.ObjectKeyFromObject(obj), obj) +} + +func (r *MirroringReconciler) list(obj client.ObjectList, listOptions ...client.ListOption) error { + return r.Client.List(r.ctx, obj, listOptions...) +} + +func (r *MirroringReconciler) update(obj client.Object, opts ...client.UpdateOption) error { + return r.Client.Update(r.ctx, obj, opts...) +} + +func (r *MirroringReconciler) delete(obj client.Object, opts ...client.DeleteOption) error { + return r.Client.Delete(r.ctx, obj, opts...) +} + +func (r *MirroringReconciler) own(owner *corev1.ConfigMap, obj client.Object) error { + return controllerutil.SetControllerReference(owner, obj, r.Scheme) +} diff --git a/controllers/storagecluster/cephblockpools.go b/controllers/storagecluster/cephblockpools.go index 361ea33238..6cd0ea92ef 100644 --- a/controllers/storagecluster/cephblockpools.go +++ b/controllers/storagecluster/cephblockpools.go @@ -4,6 +4,7 @@ import ( "fmt" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + "github.com/red-hat-storage/ocs-operator/v4/controllers/util" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -149,6 +150,7 @@ func (o *ocsCephBlockPools) reconcileMgrCephBlockPool(r *StorageClusterReconcile cephBlockPool.Spec.PoolSpec.EnableCrushUpdates = true cephBlockPool.Spec.PoolSpec.FailureDomain = getFailureDomain(storageCluster) cephBlockPool.Spec.PoolSpec.Replicated = generateCephReplicatedSpec(storageCluster, "metadata") + util.AddLabel(cephBlockPool, util.ForbidMirroringLabel, "true") return controllerutil.SetControllerReference(storageCluster, cephBlockPool, r.Scheme) }) @@ -197,6 +199,8 @@ func (o *ocsCephBlockPools) reconcileNFSCephBlockPool(r *StorageClusterReconcile cephBlockPool.Spec.PoolSpec.FailureDomain = getFailureDomain(storageCluster) cephBlockPool.Spec.PoolSpec.Replicated = generateCephReplicatedSpec(storageCluster, "data") cephBlockPool.Spec.PoolSpec.EnableRBDStats = true + util.AddLabel(cephBlockPool, util.ForbidMirroringLabel, "true") + return controllerutil.SetControllerReference(storageCluster, cephBlockPool, r.Scheme) }) if err != nil { diff --git a/controllers/storageclusterpeer/storageclusterpeer_controller.go b/controllers/storageclusterpeer/storageclusterpeer_controller.go index 2da99f7d31..32bcb7a040 100644 --- a/controllers/storageclusterpeer/storageclusterpeer_controller.go +++ b/controllers/storageclusterpeer/storageclusterpeer_controller.go @@ -21,9 +21,7 @@ import ( "encoding/base64" "encoding/json" "fmt" - "google.golang.org/grpc/codes" "strings" - "time" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client" @@ -31,17 +29,24 @@ import ( "github.com/red-hat-storage/ocs-operator/v4/services" "github.com/go-logr/logr" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" ) +const ( + StorageClientMappingConfigName = "storage-client-mapping" +) + // StorageClusterPeerReconciler reconciles a StorageClusterPeer object // nolint:revive type StorageClusterPeerReconciler struct { @@ -56,7 +61,16 @@ type StorageClusterPeerReconciler struct { func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&ocsv1.StorageClusterPeer{}). - Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches( + &ocsv1.StorageCluster{}, + &handler.EnqueueRequestForObject{}, + builder.WithPredicates(predicate.GenerationChangedPredicate{}), + ). + Watches( + &corev1.ConfigMap{}, + &handler.EnqueueRequestForObject{}, + builder.WithPredicates(util.NamePredicate(StorageClientMappingConfigName)), + ). Complete(r) } @@ -64,6 +78,7 @@ func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error //+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update //+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;delete;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -88,6 +103,15 @@ func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ct } if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered { + clientConfigMap := &corev1.ConfigMap{} + clientConfigMap.Name = StorageClientMappingConfigName + clientConfigMap.Namespace = storageClusterPeer.Namespace + _, err = controllerutil.CreateOrUpdate(r.ctx, r.Client, clientConfigMap, func() error { + return r.own(storageClusterPeer, clientConfigMap) + }) + if err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, nil } @@ -160,7 +184,7 @@ func (r *StorageClusterPeerReconciler) reconcileStates(storageClusterPeer *ocsv1 storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{StorageClusterUid: string(ticketData.StorageCluster)} } - ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10) + ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, util.OcsClientTimeout) if err != nil { storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateFailed return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err) @@ -192,3 +216,7 @@ func (r *StorageClusterPeerReconciler) get(obj client.Object) error { key := client.ObjectKeyFromObject(obj) return r.Client.Get(r.ctx, key, obj) } + +func (r *StorageClusterPeerReconciler) own(storageClusterPeer *ocsv1.StorageClusterPeer, obj client.Object) error { + return controllerutil.SetControllerReference(storageClusterPeer, obj, r.Scheme) +} diff --git a/controllers/storagerequest/storagerequest_controller.go b/controllers/storagerequest/storagerequest_controller.go index c2c057ff3b..3ae21cd626 100644 --- a/controllers/storagerequest/storagerequest_controller.go +++ b/controllers/storagerequest/storagerequest_controller.go @@ -371,9 +371,7 @@ func (r *StorageRequestReconciler) reconcileRadosNamespace() error { // add a blockpool name in the label so UI can watch for the rados namespace // that belongs to the particular blockpool addLabel(r.cephRadosNamespace, blockPoolNameLabel, blockPoolName) - r.cephRadosNamespace.Spec = rookCephv1.CephBlockPoolRadosNamespaceSpec{ - BlockPoolName: blockPoolName, - } + r.cephRadosNamespace.Spec.BlockPoolName = blockPoolName return nil }) diff --git a/controllers/util/k8sutil.go b/controllers/util/k8sutil.go index 1acf5f649d..22d7ea471a 100644 --- a/controllers/util/k8sutil.go +++ b/controllers/util/k8sutil.go @@ -3,16 +3,18 @@ package util import ( "context" "fmt" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" + "time" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" "github.com/go-logr/logr" configv1 "github.com/openshift/api/config/v1" + "golang.org/x/exp/maps" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -47,15 +49,19 @@ const ( EnableNFSKey = "ROOK_CSI_ENABLE_NFS" DisableCSIDriverKey = "ROOK_CSI_DISABLE_DRIVER" - // This is the name for the OwnerUID FieldIndex - OwnerUIDIndexName = "ownerUID" + // This is the name for the FieldIndex + OwnerUIDIndexName = "ownerUID" + AnnotationIndexName = "annotation" OdfInfoNamespacedNameClaimName = "odfinfo.odf.openshift.io" ExitCodeThatShouldRestartTheProcess = 42 + //ForbidMirroringLabel is used to forbid mirroring for ceph resources such as CephBlockPool + ForbidMirroringLabel = "ocs.openshift.io/forbid-mirroring" BlockPoolMirroringTargetIDAnnotation = "ocs.openshift.io/mirroring-target-id" RequestMaintenanceModeAnnotation = "ocs.openshift.io/request-maintenance-mode" CephRBDMirrorName = "cephrbdmirror" + OcsClientTimeout = 10 * time.Second ) var podNamespace = os.Getenv(PodNamespaceEnvVar) @@ -167,6 +173,10 @@ func OwnersIndexFieldFunc(obj client.Object) []string { return owners } +func AnnotationIndexFieldFunc(obj client.Object) []string { + return maps.Keys(obj.GetAnnotations()) +} + func GenerateNameForNonResilientCephBlockPoolSC(initData *ocsv1.StorageCluster) string { if initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName != "" { return initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName diff --git a/deploy/csv-templates/ocs-operator.csv.yaml.in b/deploy/csv-templates/ocs-operator.csv.yaml.in index ffa68ff072..71dde54fe0 100644 --- a/deploy/csv-templates/ocs-operator.csv.yaml.in +++ b/deploy/csv-templates/ocs-operator.csv.yaml.in @@ -293,6 +293,12 @@ spec: - patch - update - watch + - apiGroups: + - "" + resources: + - configmaps/finalizers + verbs: + - update - apiGroups: - "" resources: diff --git a/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml b/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml index 30a1d7cab1..438e2abda6 100644 --- a/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml +++ b/deploy/ocs-operator/manifests/ocs-operator.clusterserviceversion.yaml @@ -302,6 +302,12 @@ spec: - patch - update - watch + - apiGroups: + - "" + resources: + - configmaps/finalizers + verbs: + - update - apiGroups: - "" resources: diff --git a/go.mod b/go.mod index ec70120514..de1d1ef4e8 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/rook/rook/pkg/apis v0.0.0-20241119201302-fc456553b3cc github.com/stretchr/testify v1.9.0 go.uber.org/multierr v1.11.0 + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/net v0.31.0 google.golang.org/grpc v1.68.0 gopkg.in/ini.v1 v1.67.0 @@ -122,7 +123,6 @@ require ( go.mongodb.org/mongo-driver v1.16.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.29.0 // indirect - golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/term v0.26.0 // indirect diff --git a/main.go b/main.go index 37ea4d8d9e..19ff7324e6 100644 --- a/main.go +++ b/main.go @@ -58,6 +58,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" metrics "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "github.com/red-hat-storage/ocs-operator/v4/controllers/mirroring" "github.com/red-hat-storage/ocs-operator/v4/controllers/ocsinitialization" "github.com/red-hat-storage/ocs-operator/v4/controllers/platform" "github.com/red-hat-storage/ocs-operator/v4/controllers/storagecluster" @@ -234,6 +235,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "StorageClusterPeer") os.Exit(1) } + if err = (&mirroring.MirroringReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Mirroring") + os.Exit(1) + } // +kubebuilder:scaffold:builder // Create OCSInitialization CR if it's not present diff --git a/metrics/go.mod b/metrics/go.mod index 3890af5560..be8e160183 100644 --- a/metrics/go.mod +++ b/metrics/go.mod @@ -103,6 +103,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.29.0 // indirect + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/term v0.26.0 // indirect diff --git a/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go b/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go index 1acf5f649d..22d7ea471a 100644 --- a/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go +++ b/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go @@ -3,16 +3,18 @@ package util import ( "context" "fmt" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" + "time" ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" "github.com/go-logr/logr" configv1 "github.com/openshift/api/config/v1" + "golang.org/x/exp/maps" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -47,15 +49,19 @@ const ( EnableNFSKey = "ROOK_CSI_ENABLE_NFS" DisableCSIDriverKey = "ROOK_CSI_DISABLE_DRIVER" - // This is the name for the OwnerUID FieldIndex - OwnerUIDIndexName = "ownerUID" + // This is the name for the FieldIndex + OwnerUIDIndexName = "ownerUID" + AnnotationIndexName = "annotation" OdfInfoNamespacedNameClaimName = "odfinfo.odf.openshift.io" ExitCodeThatShouldRestartTheProcess = 42 + //ForbidMirroringLabel is used to forbid mirroring for ceph resources such as CephBlockPool + ForbidMirroringLabel = "ocs.openshift.io/forbid-mirroring" BlockPoolMirroringTargetIDAnnotation = "ocs.openshift.io/mirroring-target-id" RequestMaintenanceModeAnnotation = "ocs.openshift.io/request-maintenance-mode" CephRBDMirrorName = "cephrbdmirror" + OcsClientTimeout = 10 * time.Second ) var podNamespace = os.Getenv(PodNamespaceEnvVar) @@ -167,6 +173,10 @@ func OwnersIndexFieldFunc(obj client.Object) []string { return owners } +func AnnotationIndexFieldFunc(obj client.Object) []string { + return maps.Keys(obj.GetAnnotations()) +} + func GenerateNameForNonResilientCephBlockPoolSC(initData *ocsv1.StorageCluster) string { if initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName != "" { return initData.Spec.ManagedResources.CephNonResilientPools.StorageClassName diff --git a/metrics/vendor/golang.org/x/exp/LICENSE b/metrics/vendor/golang.org/x/exp/LICENSE new file mode 100644 index 0000000000..2a7cf70da6 --- /dev/null +++ b/metrics/vendor/golang.org/x/exp/LICENSE @@ -0,0 +1,27 @@ +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/metrics/vendor/golang.org/x/exp/PATENTS b/metrics/vendor/golang.org/x/exp/PATENTS new file mode 100644 index 0000000000..733099041f --- /dev/null +++ b/metrics/vendor/golang.org/x/exp/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/metrics/vendor/golang.org/x/exp/maps/maps.go b/metrics/vendor/golang.org/x/exp/maps/maps.go new file mode 100644 index 0000000000..ecc0dabb74 --- /dev/null +++ b/metrics/vendor/golang.org/x/exp/maps/maps.go @@ -0,0 +1,94 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package maps defines various functions useful with maps of any type. +package maps + +// Keys returns the keys of the map m. +// The keys will be in an indeterminate order. +func Keys[M ~map[K]V, K comparable, V any](m M) []K { + r := make([]K, 0, len(m)) + for k := range m { + r = append(r, k) + } + return r +} + +// Values returns the values of the map m. +// The values will be in an indeterminate order. +func Values[M ~map[K]V, K comparable, V any](m M) []V { + r := make([]V, 0, len(m)) + for _, v := range m { + r = append(r, v) + } + return r +} + +// Equal reports whether two maps contain the same key/value pairs. +// Values are compared using ==. +func Equal[M1, M2 ~map[K]V, K, V comparable](m1 M1, m2 M2) bool { + if len(m1) != len(m2) { + return false + } + for k, v1 := range m1 { + if v2, ok := m2[k]; !ok || v1 != v2 { + return false + } + } + return true +} + +// EqualFunc is like Equal, but compares values using eq. +// Keys are still compared with ==. +func EqualFunc[M1 ~map[K]V1, M2 ~map[K]V2, K comparable, V1, V2 any](m1 M1, m2 M2, eq func(V1, V2) bool) bool { + if len(m1) != len(m2) { + return false + } + for k, v1 := range m1 { + if v2, ok := m2[k]; !ok || !eq(v1, v2) { + return false + } + } + return true +} + +// Clear removes all entries from m, leaving it empty. +func Clear[M ~map[K]V, K comparable, V any](m M) { + for k := range m { + delete(m, k) + } +} + +// Clone returns a copy of m. This is a shallow clone: +// the new keys and values are set using ordinary assignment. +func Clone[M ~map[K]V, K comparable, V any](m M) M { + // Preserve nil in case it matters. + if m == nil { + return nil + } + r := make(M, len(m)) + for k, v := range m { + r[k] = v + } + return r +} + +// Copy copies all key/value pairs in src adding them to dst. +// When a key in src is already present in dst, +// the value in dst will be overwritten by the value associated +// with the key in src. +func Copy[M1 ~map[K]V, M2 ~map[K]V, K comparable, V any](dst M1, src M2) { + for k, v := range src { + dst[k] = v + } +} + +// DeleteFunc deletes any key/value pairs from m for which del returns true. +func DeleteFunc[M ~map[K]V, K comparable, V any](m M, del func(K, V) bool) { + for k, v := range m { + if del(k, v) { + delete(m, k) + } + } +} diff --git a/metrics/vendor/modules.txt b/metrics/vendor/modules.txt index 89061c296c..00bd04484d 100644 --- a/metrics/vendor/modules.txt +++ b/metrics/vendor/modules.txt @@ -353,6 +353,9 @@ go.uber.org/zap/zapcore # golang.org/x/crypto v0.29.0 ## explicit; go 1.20 golang.org/x/crypto/pbkdf2 +# golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c +## explicit; go 1.22.0 +golang.org/x/exp/maps # golang.org/x/net v0.31.0 ## explicit; go 1.18 golang.org/x/net/context