Skip to content

Commit

Permalink
feat: Enhance MirrorPeer and DRPolicy controllers for StorageClientTy…
Browse files Browse the repository at this point in the history
…pe handling

- Updated `MirrorPeerReconciler` to handle `StorageClientType` by adding conditions in `Reconcile` method.
- Introduced `createStorageClusterPeer` function for creating `StorageClusterPeer` objects.
- Added utility functions `fetchClientInfoConfigMap` and `getClientInfoFromConfigMap` for handling client info.
- Modified `processManagedClusterAddon` to utilize new config structure.
- Enhanced `DRPolicyReconciler` to skip certain operations based on `StorageClientType`.

Signed-off-by: vbadrina <[email protected]>
  • Loading branch information
vbnrh committed Jul 22, 2024
1 parent 7cfca25 commit adecfb9
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 18 deletions.
2 changes: 1 addition & 1 deletion addons/agent_mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async && !utils.IsStorageClientType(mirrorPeer.Spec.Items) {
clusterFSIDs := make(map[string]string)
logger.Info("Fetching clusterFSIDs")
err = r.fetchClusterFSIDs(ctx, &mirrorPeer, clusterFSIDs)
Expand Down
4 changes: 3 additions & 1 deletion api/v1alpha1/mirrorpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ const (

// StorageClusterRef holds a reference to a StorageCluster
type StorageClusterRef struct {
Name string `json:"name"`
Name string `json:"name"`

// +kubebuilder:validation:Optional
Namespace string `json:"namespace"`
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ spec:
type: string
required:
- name
- namespace
type: object
required:
- clusterName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ metadata:
]
capabilities: Basic Install
console.openshift.io/plugins: '["odf-multicluster-console"]'
createdAt: "2024-07-16T05:37:17Z"
createdAt: "2024-07-17T07:39:20Z"
olm.skipRange: ""
operators.openshift.io/infrastructure-features: '["disconnected"]'
operators.operatorframework.io/builder: operator-sdk-v1.34.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ spec:
type: string
required:
- name
- namespace
type: object
required:
- clusterName
Expand Down
5 changes: 5 additions & 0 deletions controllers/drpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (r *DRPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
logger.Info("MirrorPeer contains StorageClient reference. Skipping creation of VolumeReplicationClasses", "MirrorPeer", mirrorPeer.Name)
return ctrl.Result{}, nil
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
clusterFSIDs := make(map[string]string)
logger.Info("Fetching cluster FSIDs")
Expand Down
200 changes: 189 additions & 11 deletions controllers/mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package controllers

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"

"github.com/red-hat-storage/odf-multicluster-orchestrator/addons/setup"

ramenv1alpha1 "github.com/ramendr/ramen/api/v1alpha1"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
addons "github.com/red-hat-storage/odf-multicluster-orchestrator/addons"
multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
"github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils"
Expand Down Expand Up @@ -53,8 +56,11 @@ type MirrorPeerReconciler struct {
Logger *slog.Logger
}

const mirrorPeerFinalizer = "hub.multicluster.odf.openshift.io"
const spokeClusterRoleBindingName = "spoke-clusterrole-bindings"
const (
mirrorPeerFinalizer = "hub.multicluster.odf.openshift.io"
spokeClusterRoleBindingName = "spoke-clusterrole-bindings"
ClientConfigMapKeyTemplate = "%s/%s"
)

//+kubebuilder:rbac:groups=multicluster.odf.openshift.io,resources=mirrorpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=multicluster.odf.openshift.io,resources=mirrorpeers/status,verbs=get;update;patch
Expand Down Expand Up @@ -241,45 +247,217 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
result, err := createStorageClusterPeer(ctx, r.Client, logger, mirrorPeer)
if err != nil {
logger.Error("Failed to create StorageClusterPeer", "error", err)
return result, err
}
}
return r.updateMirrorPeerStatus(ctx, mirrorPeer)
}

func getKey(clusterName, clientName string) string {
return fmt.Sprintf(ClientConfigMapKeyTemplate, clusterName, clientName)
}

func createStorageClusterPeer(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer multiclusterv1alpha1.MirrorPeer) (ctrl.Result, error) {
logger = logger.With("MirrorPeer", mirrorPeer.Name)
clientInfoMap, err := fetchClientInfoConfigMap(ctx, client)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Client info config map not found. Retrying request another time...")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
items := mirrorPeer.Spec.Items
clientInfo := make([]ClientInfo, 0)

for _, item := range items {
ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, getKey(item.ClusterName, item.StorageClusterRef.Name))
if err != nil {
return ctrl.Result{}, err
}
clientInfo = append(clientInfo, ci)
}

for i := range items {
var storageClusterPeerName string
var oppositeClient ClientInfo
currentClient := clientInfo[i]
// Provider A StorageClusterPeer contains info of Provider B endpoint and ticket, hence this
if i == 0 {
oppositeClient := clientInfo[1]
storageClusterPeerName = getStorageClusterPeerName(oppositeClient.ProviderInfo.ProviderManagedClusterName)
} else {
oppositeClient = clientInfo[0]
storageClusterPeerName = getStorageClusterPeerName(oppositeClient.ProviderInfo.ProviderManagedClusterName)
}

storageClusterPeer := ocsv1.StorageClusterPeer{
ObjectMeta: metav1.ObjectMeta{
Name: storageClusterPeerName,
// This provider A namespace on which the Storage object exists
Namespace: currentClient.ProviderInfo.NamespacedName.Namespace,
},
Spec: ocsv1.StorageClusterPeerSpec{
// Endpoint for StorageClusterPeer in Provider A will be the Provider B's endpoint and vice versa
APIServerEndpoint: oppositeClient.ProviderInfo.StorageProviderEndpoint,
// Same above logic for onboarding token
OnboardingTicket: fetchOnboardingTicket(oppositeClient),
},
}
storageClusterPeerJson, err := json.Marshal(storageClusterPeer)
if err != nil {
logger.Error("Failed to marshal StorageClusterPeer to JSON", "StorageClusterPeer", storageClusterPeerName)
return ctrl.Result{}, err
}

ownerRef := metav1.OwnerReference{
APIVersion: mirrorPeer.APIVersion,
Kind: mirrorPeer.Kind,
Name: mirrorPeer.Name,
UID: mirrorPeer.UID,
}

// ManifestWork created for Provider A will be called storageclusterpeer-{ProviderA} since that is where Manifests will be applied
// Provider names are unique hence only 1 ManifestWork per ProviderCluster
manifestWorkName := fmt.Sprintf("storageclusterpeer-%s", currentClient.ProviderInfo.ProviderManagedClusterName)

// The namespace of Provider A is where this ManifestWork will be created on the hub
namespace := currentClient.ProviderInfo.ProviderManagedClusterName

operationResult, err := utils.CreateOrUpdateManifestWork(ctx, client, manifestWorkName, namespace, storageClusterPeerJson, ownerRef)
if err != nil {
return ctrl.Result{}, err
}

logger.Info(fmt.Sprintf("ManifestWork was %s for StorageClusterPeer %s", operationResult, storageClusterPeerName))
}

return ctrl.Result{}, nil
}

func fetchOnboardingTicket(clientInfo ClientInfo) string {
panic("unimplemented")
}

func getStorageClusterPeerName(providerClusterName string) string {
// Provider A will have SCP named {ProviderB}-peer
return fmt.Sprintf("%s-peer", providerClusterName)
}

func fetchClientInfoConfigMap(ctx context.Context, c client.Client) (*corev1.ConfigMap, error) {
currentNamespace := os.Getenv("POD_NAMESPACE")
if currentNamespace == "" {
return nil, fmt.Errorf("cannot detect the current namespace")
}
clientInfoMap, err := utils.FetchConfigMap(ctx, c, ClientInfoConfigMapName, currentNamespace)
if err != nil {
return nil, err
}
return clientInfoMap, nil
}

type ManagedClusterAddonConfig struct {
// Namespace on the managedCluster where it will be deployed
InstallNamespace string

// Name of the MCA
Name string

// Namespace on the hub where MCA will be created, it represents the Managed cluster where the addons will be deployed
Namespace string
}

// Helper function to extract and unmarshal ClientInfo from ConfigMap
func getClientInfoFromConfigMap(clientInfoMap map[string]string, key string) (ClientInfo, error) {
clientInfoJSON, ok := clientInfoMap[key]
if !ok {
return ClientInfo{}, fmt.Errorf("client info for %s not found in ConfigMap", key)
}

var clientInfo ClientInfo
if err := json.Unmarshal([]byte(clientInfoJSON), &clientInfo); err != nil {
return ClientInfo{}, fmt.Errorf("failed to unmarshal client info for %s: %v", key, err)
}

return clientInfo, nil
}

func getConfig(ctx context.Context, c client.Client, mp multiclusterv1alpha1.MirrorPeer) ([]ManagedClusterAddonConfig, error) {
managedClusterAddonsConfig := make([]ManagedClusterAddonConfig, 0)
if utils.IsStorageClientType(mp.Spec.Items) {
clientInfoMap, err := fetchClientInfoConfigMap(ctx, c)
if err != nil {
return []ManagedClusterAddonConfig{}, err
}
for _, item := range mp.Spec.Items {
clientName := item.StorageClusterRef.Name
clientInfo, err := getClientInfoFromConfigMap(clientInfoMap.Data, getKey(item.ClusterName, clientName))
if err != nil {
return []ManagedClusterAddonConfig{}, err
}
config := ManagedClusterAddonConfig{
Name: setup.TokenExchangeName,
Namespace: clientInfo.ProviderInfo.ProviderManagedClusterName,
InstallNamespace: clientInfo.ProviderInfo.NamespacedName.Namespace,
}
managedClusterAddonsConfig = append(managedClusterAddonsConfig, config)
}
} else {
for _, item := range mp.Spec.Items {
managedClusterAddonsConfig = append(managedClusterAddonsConfig, ManagedClusterAddonConfig{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
InstallNamespace: item.StorageClusterRef.Namespace,
})
}
}
return managedClusterAddonsConfig, nil
}

// processManagedClusterAddon creates an addon for the cluster management in all the peer refs,
// the resources gets an owner ref of the mirrorpeer to let the garbage collector handle it if the mirrorpeer gets deleted
func (r *MirrorPeerReconciler) processManagedClusterAddon(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer) error {
logger := r.Logger.With("MirrorPeer", mirrorPeer.Name)
logger.Info("Processing ManagedClusterAddons for MirrorPeer")

for _, item := range mirrorPeer.Spec.Items {
logger.Info("Handling ManagedClusterAddon for cluster", "ClusterName", item.ClusterName)
addonConfigs, err := getConfig(ctx, r.Client, mirrorPeer)
if err != nil {
return fmt.Errorf("failed to get managedclusteraddon config %w", err)
}
for _, config := range addonConfigs {
logger.Info("Handling ManagedClusterAddon for cluster", "ClusterName", config.Namespace)

var managedClusterAddOn addonapiv1alpha1.ManagedClusterAddOn
namespacedName := types.NamespacedName{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
Name: config.Name,
Namespace: config.Namespace,
}

err := r.Client.Get(ctx, namespacedName, &managedClusterAddOn)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("ManagedClusterAddon not found, will create a new one", "ClusterName", item.ClusterName)
logger.Info("ManagedClusterAddon not found, will create a new one", "ClusterName", config.Namespace)

annotations := make(map[string]string)
annotations[utils.DRModeAnnotationKey] = string(mirrorPeer.Spec.Type)
managedClusterAddOn = addonapiv1alpha1.ManagedClusterAddOn{
ObjectMeta: metav1.ObjectMeta{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
Namespace: config.Namespace,
Annotations: annotations,
},
}
}
}

_, err = controllerutil.CreateOrUpdate(ctx, r.Client, &managedClusterAddOn, func() error {
managedClusterAddOn.Spec.InstallNamespace = item.StorageClusterRef.Namespace
managedClusterAddOn.Spec.InstallNamespace = config.InstallNamespace
if err := controllerutil.SetOwnerReference(&mirrorPeer, &managedClusterAddOn, r.Scheme); err != nil {
logger.Error("Failed to set owner reference on ManagedClusterAddon", "error", err, "ClusterName", item.ClusterName)
logger.Error("Failed to set owner reference on ManagedClusterAddon", "error", err, "ClusterName", config.Namespace)
return err
}
return nil
Expand All @@ -290,7 +468,7 @@ func (r *MirrorPeerReconciler) processManagedClusterAddon(ctx context.Context, m
return err
}

logger.Info("Successfully reconciled ManagedClusterAddOn", "ClusterName", item.ClusterName)
logger.Info("Successfully reconciled ManagedClusterAddOn", "ClusterName", config.Namespace)
}

logger.Info("Successfully processed all ManagedClusterAddons for MirrorPeer")
Expand Down
22 changes: 22 additions & 0 deletions controllers/utils/configmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package utils

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// FetchConfigMap fetches a ConfigMap with a given name from a given namespace
func FetchConfigMap(ctx context.Context, c client.Client, name, namespace string) (*corev1.ConfigMap, error) {
configMap := &corev1.ConfigMap{}
err := c.Get(ctx, client.ObjectKey{
Name: name,
Namespace: namespace,
}, configMap)
if err != nil {
return nil, fmt.Errorf("failed to fetch ConfigMap %s in namespace %s: %v", name, namespace, err)
}
return configMap, nil
}
45 changes: 45 additions & 0 deletions controllers/utils/manifestwork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package utils

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
workv1 "open-cluster-management.io/api/work/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func CreateOrUpdateManifestWork(ctx context.Context, c client.Client, name string, namespace string, objJson []byte, ownerRef metav1.OwnerReference) (controllerutil.OperationResult, error) {
mw := workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
ownerRef,
},
},
}

operationResult, err := controllerutil.CreateOrUpdate(ctx, c, &mw, func() error {
mw.Spec = workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Manifests: []workv1.Manifest{
{
RawExtension: runtime.RawExtension{
Raw: objJson,
},
},
},
},
}
return nil
})

if err != nil {
return operationResult, fmt.Errorf("failed to create and update ManifestWork %s for namespace %s. error %w", name, namespace, err)
}

return operationResult, nil
}
4 changes: 4 additions & 0 deletions controllers/utils/peer_ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ func GetPeerRefForSpokeCluster(mp *multiclusterv1alpha1.MirrorPeer, spokeCluster
}
return nil, fmt.Errorf("PeerRef for cluster %s under mirrorpeer %s not found", spokeClusterName, mp.Name)
}

func IsStorageClientType(peerRefs []multiclusterv1alpha1.PeerRef) bool {
return peerRefs[0].StorageClusterRef.Namespace == "" && peerRefs[1].StorageClusterRef.Namespace == ""
}
Loading

0 comments on commit adecfb9

Please sign in to comment.