Skip to content

Commit

Permalink
add logic for mirroring controller
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 6, 2024
1 parent c2f49f9 commit 9220d80
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 4 deletions.
386 changes: 386 additions & 0 deletions controllers/mirroring/mirroring_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,386 @@
/*
Copyright 2020 Red Hat OpenShift Container Storage.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mirroring

import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"slices"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
ocsv1alpha1 "github.com/red-hat-storage/ocs-operator/api/v4/v1alpha1"
providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client"
"github.com/red-hat-storage/ocs-operator/v4/controllers/storageclusterpeer"
controllers "github.com/red-hat-storage/ocs-operator/v4/controllers/storageconsumer"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"

"github.com/go-logr/logr"
rookCephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
rBDMirrorName = "rbd-mirror"
// internalKey is a special key for client-mapping-config to establish mirroring between blockpools for internal mode
internalKey = "internal"

mirroringFinalizer = "mirroring.ocs.openshift.io"
)

// MirroringReconciler reconciles a Mirroring fields for Ceph Object(s)
// nolint:revive
type MirroringReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
ctx context.Context
clientMappingConfig *corev1.ConfigMap
storageClusterPeer *ocsv1.StorageClusterPeer
mapStorageConsumerNameToObj map[string]*ocsv1alpha1.StorageConsumer
}

// SetupWithManager sets up the controller with the Manager.
func (r *MirroringReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.ConfigMap{}, builder.WithPredicates(util.PrefixNamePredicate(storageclusterpeer.ClientMappingConfigPrefix))).
Watches(&ocsv1.StorageClusterPeer{}, &handler.EnqueueRequestForObject{}).
Watches(&ocsv1alpha1.StorageConsumer{}, &handler.EnqueueRequestForObject{}).
Complete(r)
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers;storageconsumers,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=configmaps/finalizers,verbs=update
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *MirroringReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
r.ctx = ctx
r.log = log.FromContext(ctx, "Mirroring Controller", request)
r.log.Info("Reconciling Mirroring Controller.")

r.clientMappingConfig = &corev1.ConfigMap{}
r.clientMappingConfig.Name = request.Name
r.clientMappingConfig.Namespace = request.Namespace

if err = r.get(r.clientMappingConfig); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info(fmt.Sprintf("ConfigMap %s not found. Ignoring since object must be deleted.", r.clientMappingConfig.Name))
return ctrl.Result{}, nil
}
r.log.Error(err, "Failed to get ConfigMap.", "ConfigMap", r.clientMappingConfig.Name)
return ctrl.Result{}, err
}

if len(r.clientMappingConfig.Data) < 1 {
return ctrl.Result{}, nil
}

// Find the StorageClusterPeer from OwnerRef
var storageClusterPeerName string
for _, ownerRef := range r.clientMappingConfig.OwnerReferences {
if ownerRef.Kind == "StorageClusterPeer" {
storageClusterPeerName = ownerRef.Name
break
}
}
if storageClusterPeerName == "" {
return ctrl.Result{}, fmt.Errorf("failed to find storageClusterPeer attached to client mapping")
}

// Fetch the StorageClusterPeer instance
r.storageClusterPeer = &ocsv1.StorageClusterPeer{}
r.storageClusterPeer.Name = storageClusterPeerName
r.storageClusterPeer.Namespace = request.Namespace

if err = r.get(r.storageClusterPeer); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info("StorageClusterPeer resource not found. Ignoring since object must be deleted.")
return ctrl.Result{}, nil
}
r.log.Error(err, "Failed to get StorageClusterPeer.")
return ctrl.Result{}, err
}

storageConsumerList := &ocsv1alpha1.StorageConsumerList{}
err = r.list(storageConsumerList, client.InNamespace(r.storageClusterPeer.Namespace))
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list StorageConsumer objects: %w", err)
}
for i := range storageConsumerList.Items {
r.mapStorageConsumerNameToObj[storageConsumerList.Items[i].Name] = &storageConsumerList.Items[i]
}

return r.reconcilePhases()
}

func (r *MirroringReconciler) reconcilePhases() (ctrl.Result, error) {
if r.storageClusterPeer.Status.State != ocsv1.StorageClusterPeerStatePeered {
return ctrl.Result{}, fmt.Errorf("waiting for StorageClusterPeer to be in Peered state")
}

ocsClient, err := r.newExternalClusterClient()
if err != nil {
return ctrl.Result{}, err
}
defer ocsClient.Close()

// marked for deletion
if !r.clientMappingConfig.GetDeletionTimestamp().IsZero() {
if res, err := r.disableBlockPoolMirroring(); err != nil || !res.IsZero() {
return res, err
}

if controllerutil.RemoveFinalizer(r.storageClusterPeer, mirroringFinalizer) {
r.log.Info("removing finalizer from ClientMappingConfig.", "ClientMappingConfig", r.clientMappingConfig.Name)
if err := r.update(r.storageClusterPeer); err != nil {
r.log.Info("Failed to remove finalizer from ClientMappingConfig", "ClientMappingConfig", r.clientMappingConfig.Name)
return ctrl.Result{}, fmt.Errorf("failed to remove finalizer from ClientMappingConfig: %v", err)
}
}
}

if controllerutil.AddFinalizer(r.storageClusterPeer, mirroringFinalizer) {
r.log.Info("Finalizer not found for ClientMappingConfig. Adding finalizer.", "ClientMappingConfig", r.clientMappingConfig.Name)
if err := r.update(r.clientMappingConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update ClientMappingConfig: %v", err)
}
}

err = r.reconcileRBDMirrorDaemon()
if err != nil {
return ctrl.Result{}, err
}

if res, err := r.reconcileBlockPoolMirroring(ocsClient); err != nil || !res.IsZero() {
return res, err
}

var peerClientIDs []string

for localClientID, peerClientID := range r.clientMappingConfig.Data {
if localClientID == internalKey {
continue
}
if r.mapStorageConsumerNameToObj[localClientID] == nil {
return ctrl.Result{}, fmt.Errorf("failed to find StorageConsumer %s", localClientID)
}
peerClientIDs = append(peerClientIDs, peerClientID)
}

response, err := ocsClient.GetClientsInfo(r.ctx, r.storageClusterPeer.Status.PeerInfo.StorageClusterUid, peerClientIDs)
if err != nil {
return ctrl.Result{}, err
}

mapPeerClientIDToRadosNamespace := map[string]string{}
for i := range response.ClientsInfo {
mapPeerClientIDToRadosNamespace[response.ClientsInfo[i].ClientID] = response.ClientsInfo[i].RadosNamespace
}

for localClientID, peerClientID := range r.clientMappingConfig.Data {
if localClientID == internalKey {
continue
}
radosNamespaceList := &rookCephv1.CephBlockPoolRadosNamespaceList{}
err = r.list(
radosNamespaceList,
client.InNamespace(r.storageClusterPeer.Namespace),
client.MatchingLabels{controllers.StorageConsumerNameLabel: r.mapStorageConsumerNameToObj[localClientID].Name},
)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list radosnamespace(s) for StorageConsumer %s", localClientID)
}
for i := range radosNamespaceList.Items {
rns := &radosNamespaceList.Items[i]
rns.Spec.Mirroring = &rookCephv1.RadosNamespaceMirroring{
RemoteNamespace: ptr.To(mapPeerClientIDToRadosNamespace[peerClientID]),
Mode: "image",
}
err := r.update(rns)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update radosnamespace %s", rns.Name)
}
}
}
return ctrl.Result{}, nil
}

func (r *MirroringReconciler) newExternalClusterClient() (*providerClient.OCSProviderClient, error) {
ocsProviderClient, err := providerClient.NewProviderClient(
r.ctx, r.storageClusterPeer.Spec.ApiEndpoint, time.Second*10)
if err != nil {
return nil, fmt.Errorf("failed to create a new provider client: %v", err)
}
return ocsProviderClient, nil
}

func (r *MirroringReconciler) disableBlockPoolMirroring() (ctrl.Result, error) {

cephBlockPoolsList, err := r.listCephBlockPools()
if err != nil {
return ctrl.Result{}, err
}

for i := range cephBlockPoolsList.Items {
cephBlockPool := &cephBlockPoolsList.Items[i]

cephBlockPool.Spec.Mirroring.Enabled = true
cephBlockPool.Spec.Mirroring.Mode = "image"
cephBlockPool.Spec.Mirroring.Peers.SecretNames = []string{}

err := r.update(cephBlockPool)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to disable mirroring for CephBlockPool %q: %v", cephBlockPool.Name, err)
}
}
return ctrl.Result{}, nil
}

func (r *MirroringReconciler) reconcileRBDMirrorDaemon() error {
rbdMirror := &rookCephv1.CephRBDMirror{}
rbdMirror.Name = rBDMirrorName
rbdMirror.Namespace = r.storageClusterPeer.Namespace

_, err := ctrl.CreateOrUpdate(r.ctx, r.Client, rbdMirror, func() error {
if err := r.own(rbdMirror); err != nil {
return err
}
rbdMirror.Spec.Count = 1
return nil
})
if err != nil {
r.log.Error(err, "Failed to create/update the CephRBDMirror", "CephRBDMirror", rbdMirror)
return err
}

return nil
}

func (r *MirroringReconciler) reconcileBlockPoolMirroring(ocsClient *providerClient.OCSProviderClient) (ctrl.Result, error) {

cephBlockPoolsList, err := r.listCephBlockPools()
if err != nil {
return ctrl.Result{}, err
}

var blockPoolsList []string
blockPoolNameMap := map[string]*rookCephv1.CephBlockPool{}
//enable mirroring for blockpools
for i := range cephBlockPoolsList.Items {
blockPoolsList = append(blockPoolsList, cephBlockPoolsList.Items[i].Name)
blockPoolNameMap[cephBlockPoolsList.Items[i].Name] = &cephBlockPoolsList.Items[i]
cephBlockPool := cephBlockPoolsList.Items[i]
cephBlockPool.Spec.Mirroring.Enabled = true
cephBlockPool.Spec.Mirroring.Mode = "image"
err := r.update(&cephBlockPool)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to enable mirroring for CephBlockPool %v: %w", cephBlockPool.Name, err)
}
}

// fetch BlockPoolsInfo
response, err := ocsClient.GetBlockPoolsInfo(r.ctx, r.storageClusterPeer.Status.PeerInfo.StorageClusterUid, blockPoolsList)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get CephBlockPool(s) info from Peer: %w", err)
}

for i := range response.BlockPoolsInfo {
blockPoolName := response.BlockPoolsInfo[i].BlockPoolName

bootstrapSecret := &corev1.Secret{}
bootstrapSecret.Name = fmt.Sprintf("%s-%s", "peer", blockPoolName)
bootstrapSecret.Namespace = r.storageClusterPeer.Namespace

_, err = ctrl.CreateOrUpdate(r.ctx, r.Client, bootstrapSecret, func() error {
if err = r.own(bootstrapSecret); err != nil {
return err
}
bootstrapSecret.Data = map[string][]byte{
"pool": []byte(blockPoolName),
"token": []byte(response.BlockPoolsInfo[i].MirroringToken),
}
return nil
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create/update bootstrap secret: %w", err)
}

cephBlockPool := blockPoolNameMap[blockPoolName]
if cephBlockPool.Spec.Mirroring.Peers == nil {
cephBlockPool.Spec.Mirroring.Peers = &rookCephv1.MirroringPeerSpec{SecretNames: []string{}}
}
if !slices.Contains(cephBlockPool.Spec.Mirroring.Peers.SecretNames, bootstrapSecret.Name) {
cephBlockPool.Spec.Mirroring.Peers.SecretNames = append(cephBlockPool.Spec.Mirroring.Peers.SecretNames, bootstrapSecret.Name)
}
err := r.update(cephBlockPool)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update bootstrap secret ref on CephBlockPool %q: %v", cephBlockPool.Name, err)
}
}

return ctrl.Result{}, nil
}

func (r *MirroringReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}

func (r *MirroringReconciler) list(obj client.ObjectList, listOptions ...client.ListOption) error {
return r.Client.List(r.ctx, obj, listOptions...)
}

func (r *MirroringReconciler) update(obj client.Object, opts ...client.UpdateOption) error {
return r.Client.Update(r.ctx, obj, opts...)
}

func (r *MirroringReconciler) own(obj client.Object) error {
return controllerutil.SetControllerReference(r.storageClusterPeer, obj, r.Scheme)
}

func (r *MirroringReconciler) listCephBlockPools() (*rookCephv1.CephBlockPoolList, error) {

selector := labels.NewSelector()
blackListRequirement, err := labels.NewRequirement(util.CephBlockPoolForbidMirroringLabel, selection.NotEquals, []string{"true"})
if err != nil {
return nil, err
}
selector = selector.Add(*blackListRequirement)

cephBlockPoolsList := &rookCephv1.CephBlockPoolList{}
err = r.list(cephBlockPoolsList, client.InNamespace(r.storageClusterPeer.Namespace), client.MatchingLabelsSelector{Selector: selector})
if err != nil {
return nil, fmt.Errorf("failed to list CephBlockPools: %w", err)
}
return cephBlockPoolsList, nil
}
Loading

0 comments on commit 9220d80

Please sign in to comment.