Skip to content

Commit

Permalink
feat: Enhance MirrorPeer and DRPolicy controllers for StorageClientTy…
Browse files Browse the repository at this point in the history
…pe handling

- Updated `MirrorPeerReconciler` to handle `StorageClientType` by adding conditions in `Reconcile` method.
- Introduced `createStorageClusterPeer` function for creating `StorageClusterPeer` objects.
- Added utility functions `fetchClientInfoConfigMap` and `getClientInfoFromConfigMap` for handling client info.
- Modified `processManagedClusterAddon` to utilize new config structure.
- Enhanced `DRPolicyReconciler` to skip certain operations based on `StorageClientType`.

Signed-off-by: vbadrina <[email protected]>
  • Loading branch information
vbnrh committed Jul 17, 2024
1 parent 0e61c75 commit d5eb008
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 14 deletions.
2 changes: 1 addition & 1 deletion addons/agent_mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async && !utils.IsStorageClientType(mirrorPeer.Spec.Items) {
clusterFSIDs := make(map[string]string)
logger.Info("Fetching clusterFSIDs")
err = r.fetchClusterFSIDs(ctx, &mirrorPeer, clusterFSIDs)
Expand Down
5 changes: 5 additions & 0 deletions controllers/drpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (r *DRPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
logger.Info("MirrorPeer contains StorageClient reference. Skipping creation of VolumeReplicationClasses", "MirrorPeer", mirrorPeer.Name)
return ctrl.Result{}, nil
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
clusterFSIDs := make(map[string]string)
logger.Info("Fetching cluster FSIDs")
Expand Down
205 changes: 196 additions & 9 deletions controllers/mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package controllers

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"

"github.com/red-hat-storage/odf-multicluster-orchestrator/addons/setup"

ramenv1alpha1 "github.com/ramendr/ramen/api/v1alpha1"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
addons "github.com/red-hat-storage/odf-multicluster-orchestrator/addons"
multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
"github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils"
Expand Down Expand Up @@ -55,6 +58,26 @@ type MirrorPeerReconciler struct {

const mirrorPeerFinalizer = "hub.multicluster.odf.openshift.io"
const spokeClusterRoleBindingName = "spoke-clusterrole-bindings"
const ClientInfoConfigMapName = "odf-client-info"

// Temp
type ProviderInfo struct {
Version string `json:"version"`
DeploymentType string `json:"deploymentType"`
StorageSystemName string `json:"storageSystemName"`
ProviderManagedClusterName string `json:"providerManagedClusterName"`
NamespacedName types.NamespacedName `json:"namespacedName"`
StorageProviderEndpoint string `json:"storageProviderEndpoint"`
CephClusterFSID string `json:"cephClusterFSID"`
}

type ClientInfo struct {
ClusterID string `json:"clusterId"`
Name string `json:"name"`
ProviderInfo ProviderInfo `json:"providerInfo,omitempty"`
}

// Temp

//+kubebuilder:rbac:groups=multicluster.odf.openshift.io,resources=mirrorpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=multicluster.odf.openshift.io,resources=mirrorpeers/status,verbs=get;update;patch
Expand Down Expand Up @@ -240,45 +263,209 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
result, err := createStorageClusterPeer(ctx, r.Client, logger, mirrorPeer)
if err != nil {
logger.Error("Failed to create StorageClusterPeer", "error", err)
return result, err
}
}
return r.updateMirrorPeerStatus(ctx, mirrorPeer)
}

func createStorageClusterPeer(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer multiclusterv1alpha1.MirrorPeer) (ctrl.Result, error) {
logger = logger.With("MirrorPeer", mirrorPeer.Name)
clientInfoMap, err := fetchClientInfoConfigMap(ctx, client)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Client info config map not found. Retrying request another time...")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
items := mirrorPeer.Spec.Items
clientInfo := make([]ClientInfo, 0)

for _, item := range items {
ci, err := getClientInfoFromConfigMap(clientInfoMap.Data, item.StorageClusterRef.Name)
if err != nil {
return ctrl.Result{}, err
}
clientInfo = append(clientInfo, ci)
}

for i := range items {
var storageClusterPeerName string
var ci ClientInfo

// Provider A StorageClusterPeer contains info of Provider B endpoint and ticket, hence this
if i == 0 {
ci := clientInfo[1]
storageClusterPeerName = getStorageClusterPeerName(ci.ProviderInfo.ProviderManagedClusterName)
} else {
ci = clientInfo[0]
storageClusterPeerName = getStorageClusterPeerName(ci.ProviderInfo.ProviderManagedClusterName)
}

storageClusterPeer := ocsv1.StorageClusterPeer{
ObjectMeta: metav1.ObjectMeta{
Name: storageClusterPeerName,
// This provider A namespace on which the Storage object exists
Namespace: clientInfo[i].ProviderInfo.NamespacedName.Namespace,
},
Spec: ocsv1.StorageClusterPeerSpec{
APIServerEndpoint: ci.ProviderInfo.StorageProviderEndpoint,
OnboardingTicket: fetchOnboardingTicket(ci),
},
}
storageClusterPeerJson, err := json.Marshal(storageClusterPeer)
if err != nil {
logger.Error("Failed to marshal StorageClusterPeer to JSON", "StorageClusterPeer", storageClusterPeerName)
return ctrl.Result{}, err
}

ownerRef := metav1.OwnerReference{
APIVersion: mirrorPeer.APIVersion,
Kind: mirrorPeer.Kind,
Name: mirrorPeer.Name,
UID: mirrorPeer.UID,
}

// ManifestWork created for Provider A will be called storageclusterpeer-{ProviderB} since that is what it is connecting to
// Provider names are unique hence only 1 ManifestWork per ProviderCluster
manifestWorkName := fmt.Sprintf("storageclusterpeer-%s", ci.ProviderInfo.ProviderManagedClusterName)
namespace := ci.ProviderInfo.ProviderManagedClusterName

operationResult, err := utils.CreateOrUpdateManifestWork(ctx, client, manifestWorkName, namespace, storageClusterPeerJson, ownerRef)
if err != nil {
return ctrl.Result{}, err
}

logger.Info(fmt.Sprintf("ManifestWork was %s for StorageClusterPeer %s", operationResult, storageClusterPeerName))
}

return ctrl.Result{}, nil
}

func fetchOnboardingTicket(clientInfo ClientInfo) string {
panic("unimplemented")
}

func getStorageClusterPeerName(providerClusterName string) string {
// Provider A will have SCP named {ProviderB}-peer
return fmt.Sprintf("%s-peer", providerClusterName)
}

func fetchClientInfoConfigMap(ctx context.Context, c client.Client) (*corev1.ConfigMap, error) {
currentNamespace := os.Getenv("POD_NAMESPACE")
if currentNamespace == "" {
return nil, fmt.Errorf("cannot detect the current namespace")
}
clientInfoMap, err := utils.FetchConfigMap(ctx, c, ClientInfoConfigMapName, currentNamespace)
if err != nil {
return nil, err
}
return clientInfoMap, nil
}

type ManagedClusterAddonConfig struct {
// Namespace on the managedCluster where it will be deployed
InstallNamespace string

// Name of the MCA
Name string

// Namespace on the hub where MCA will be created, it represents the Managed cluster where the addons will be deployed
Namespace string
}

// Helper function to extract and unmarshal ClientInfo from ConfigMap
func getClientInfoFromConfigMap(clientInfoMap map[string]string, clientName string) (ClientInfo, error) {
clientInfoJSON, ok := clientInfoMap[clientName]
if !ok {
return ClientInfo{}, fmt.Errorf("client info for %s not found in ConfigMap", clientName)
}

var clientInfo ClientInfo
if err := json.Unmarshal([]byte(clientInfoJSON), &clientInfo); err != nil {
return ClientInfo{}, fmt.Errorf("failed to unmarshal client info for %s: %v", clientName, err)
}

return clientInfo, nil
}

func getConfig(ctx context.Context, c client.Client, mp multiclusterv1alpha1.MirrorPeer) ([]ManagedClusterAddonConfig, error) {
managedClusterAddonsConfig := make([]ManagedClusterAddonConfig, 0)
if utils.IsStorageClientType(mp.Spec.Items) {
clientInfoMap, err := fetchClientInfoConfigMap(ctx, c)
if err != nil {
return []ManagedClusterAddonConfig{}, err
}
for _, item := range mp.Spec.Items {
clientName := item.ClusterName
clientInfo, err := getClientInfoFromConfigMap(clientInfoMap.Data, clientName)
if err != nil {
return []ManagedClusterAddonConfig{}, err
}
config := ManagedClusterAddonConfig{
Name: setup.TokenExchangeName,
Namespace: clientInfo.ProviderInfo.ProviderManagedClusterName,
InstallNamespace: clientInfo.ProviderInfo.NamespacedName.Namespace,
}
managedClusterAddonsConfig = append(managedClusterAddonsConfig, config)
}
} else {
for _, item := range mp.Spec.Items {
managedClusterAddonsConfig = append(managedClusterAddonsConfig, ManagedClusterAddonConfig{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
InstallNamespace: item.StorageClusterRef.Namespace,
})
}
}
return managedClusterAddonsConfig, nil
}

// processManagedClusterAddon creates an addon for the cluster management in all the peer refs,
// the resources gets an owner ref of the mirrorpeer to let the garbage collector handle it if the mirrorpeer gets deleted
func (r *MirrorPeerReconciler) processManagedClusterAddon(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer) error {
logger := r.Logger.With("MirrorPeer", mirrorPeer.Name)
logger.Info("Processing ManagedClusterAddons for MirrorPeer")

for _, item := range mirrorPeer.Spec.Items {
logger.Info("Handling ManagedClusterAddon for cluster", "ClusterName", item.ClusterName)
addonConfigs, err := getConfig(ctx, r.Client, mirrorPeer)
if err != nil {
return fmt.Errorf("failed to get managedclusteraddon config %w", err)
}
for _, config := range addonConfigs {
logger.Info("Handling ManagedClusterAddon for cluster", "ClusterName", config.Namespace)

var managedClusterAddOn addonapiv1alpha1.ManagedClusterAddOn
namespacedName := types.NamespacedName{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
Name: config.Name,
Namespace: config.Namespace,
}

err := r.Client.Get(ctx, namespacedName, &managedClusterAddOn)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("ManagedClusterAddon not found, will create a new one", "ClusterName", item.ClusterName)
logger.Info("ManagedClusterAddon not found, will create a new one", "ClusterName", config.Namespace)

annotations := make(map[string]string)
annotations[utils.DRModeAnnotationKey] = string(mirrorPeer.Spec.Type)
managedClusterAddOn = addonapiv1alpha1.ManagedClusterAddOn{
ObjectMeta: metav1.ObjectMeta{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
Namespace: config.Namespace,
Annotations: annotations,
},
}
}
}

_, err = controllerutil.CreateOrUpdate(ctx, r.Client, &managedClusterAddOn, func() error {
managedClusterAddOn.Spec.InstallNamespace = item.StorageClusterRef.Namespace
managedClusterAddOn.Spec.InstallNamespace = config.InstallNamespace
if err := controllerutil.SetOwnerReference(&mirrorPeer, &managedClusterAddOn, r.Scheme); err != nil {
logger.Error("Failed to set owner reference on ManagedClusterAddon", "error", err, "ClusterName", item.ClusterName)
logger.Error("Failed to set owner reference on ManagedClusterAddon", "error", err, "ClusterName", config.Namespace)
return err
}
return nil
Expand All @@ -289,7 +476,7 @@ func (r *MirrorPeerReconciler) processManagedClusterAddon(ctx context.Context, m
return err
}

logger.Info("Successfully reconciled ManagedClusterAddOn", "ClusterName", item.ClusterName)
logger.Info("Successfully reconciled ManagedClusterAddOn", "ClusterName", config.Namespace)
}

logger.Info("Successfully processed all ManagedClusterAddons for MirrorPeer")
Expand Down
22 changes: 22 additions & 0 deletions controllers/utils/configmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package utils

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// FetchConfigMap fetches a ConfigMap with a given name from a given namespace
func FetchConfigMap(ctx context.Context, c client.Client, name, namespace string) (*corev1.ConfigMap, error) {
configMap := &corev1.ConfigMap{}
err := c.Get(ctx, client.ObjectKey{
Name: name,
Namespace: namespace,
}, configMap)
if err != nil {
return nil, fmt.Errorf("failed to fetch ConfigMap %s in namespace %s: %v", name, namespace, err)
}
return configMap, nil
}
45 changes: 45 additions & 0 deletions controllers/utils/manifestwork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package utils

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
workv1 "open-cluster-management.io/api/work/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func CreateOrUpdateManifestWork(ctx context.Context, c client.Client, name string, namespace string, objJson []byte, ownerRef metav1.OwnerReference) (controllerutil.OperationResult, error) {
mw := workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
ownerRef,
},
},
}

operationResult, err := controllerutil.CreateOrUpdate(ctx, c, &mw, func() error {
mw.Spec = workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Manifests: []workv1.Manifest{
{
RawExtension: runtime.RawExtension{
Raw: objJson,
},
},
},
},
}
return nil
})

if err != nil {
return operationResult, fmt.Errorf("failed to create and update ManifestWork %s for namespace %s. error %w", name, namespace, err)
}

return operationResult, nil
}
7 changes: 7 additions & 0 deletions controllers/utils/peer_ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ func GetPeerRefForSpokeCluster(mp *multiclusterv1alpha1.MirrorPeer, spokeCluster
}
return nil, fmt.Errorf("PeerRef for cluster %s under mirrorpeer %s not found", spokeClusterName, mp.Name)
}

func IsStorageClientType(peerRefs []multiclusterv1alpha1.PeerRef) bool {
if peerRefs[0].StorageClusterRef.Namespace == "" && peerRefs[1].StorageClusterRef.Namespace == "" {
return true
}
return false
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ replace (
github.com/openshift/hive => github.com/openshift/hive v1.1.17-0.20220223000051-b1c8fa5853b1
github.com/openshift/hive/apis => github.com/openshift/hive/apis v0.0.0-20220221165319-b389a65758da
github.com/portworx/sched-ops => github.com/portworx/sched-ops v0.20.4-openstorage-rc3
github.com/red-hat-storage/ocs-operator/api/v4 => github.com/rewantsoni/ocs-operator/api/v4 v4.0.0-20240701052137-de69df292a5d
github.com/terraform-providers/terraform-provider-aws => github.com/openshift/terraform-provider-aws v1.60.1-0.20211215220004-24df6d73af46
github.com/terraform-providers/terraform-provider-ignition/v2 => github.com/community-terraform-providers/terraform-provider-ignition/v2 v2.1.0
sigs.k8s.io/cluster-api-provider-aws => github.com/openshift/cluster-api-provider-aws v0.2.1-0.20210121023454-5ffc5f422a80
Expand Down
Loading

0 comments on commit d5eb008

Please sign in to comment.