Skip to content

Commit

Permalink
feat: Enhance MirrorPeer and DRPolicy controllers for StorageClientTy…
Browse files Browse the repository at this point in the history
…pe handling

- Updated `MirrorPeerReconciler` to handle `StorageClientType` by adding conditions in `Reconcile` method.
- Introduced `createStorageClusterPeer` function for creating `StorageClusterPeer` objects.
- Added utility functions `fetchClientInfoConfigMap` and `getClientInfoFromConfigMap` for handling client info.
- Modified `processManagedClusterAddon` to utilize new config structure.
- Enhanced `DRPolicyReconciler` to skip certain operations based on `StorageClientType`.

Signed-off-by: vbadrina <[email protected]>
  • Loading branch information
vbnrh committed Jul 17, 2024
1 parent 0e61c75 commit d3f909d
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 14 deletions.
2 changes: 1 addition & 1 deletion addons/agent_mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async && !utils.IsStorageClientType(mirrorPeer.Spec.Items) {
clusterFSIDs := make(map[string]string)
logger.Info("Fetching clusterFSIDs")
err = r.fetchClusterFSIDs(ctx, &mirrorPeer, clusterFSIDs)
Expand Down
5 changes: 5 additions & 0 deletions controllers/drpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (r *DRPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
logger.Info("MirrorPeer contains StorageClient reference. Skipping creation of VolumeReplicationClasses", "MirrorPeer", mirrorPeer.Name)
return ctrl.Result{}, nil
}

if mirrorPeer.Spec.Type == multiclusterv1alpha1.Async {
clusterFSIDs := make(map[string]string)
logger.Info("Fetching cluster FSIDs")
Expand Down
185 changes: 176 additions & 9 deletions controllers/mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package controllers

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"

"github.com/red-hat-storage/odf-multicluster-orchestrator/addons/setup"

ramenv1alpha1 "github.com/ramendr/ramen/api/v1alpha1"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
addons "github.com/red-hat-storage/odf-multicluster-orchestrator/addons"
multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
"github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils"
Expand Down Expand Up @@ -55,6 +58,26 @@ type MirrorPeerReconciler struct {

const mirrorPeerFinalizer = "hub.multicluster.odf.openshift.io"
const spokeClusterRoleBindingName = "spoke-clusterrole-bindings"
const ClientInfoConfigMapName = "odf-client-info"

// Temp
type ProviderInfo struct {
Version string `json:"version"`
DeploymentType string `json:"deploymentType"`
StorageSystemName string `json:"storageSystemName"`
ProviderManagedClusterName string `json:"providerManagedClusterName"`
NamespacedName types.NamespacedName `json:"namespacedName"`
StorageProviderEndpoint string `json:"storageProviderEndpoint"`
CephClusterFSID string `json:"cephClusterFSID"`
}

type ClientInfo struct {
ClusterID string `json:"clusterId"`
Name string `json:"name"`
ProviderInfo ProviderInfo `json:"providerInfo,omitempty"`
}

// Temp

//+kubebuilder:rbac:groups=multicluster.odf.openshift.io,resources=mirrorpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=multicluster.odf.openshift.io,resources=mirrorpeers/status,verbs=get;update;patch
Expand Down Expand Up @@ -240,45 +263,189 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
result, err := createStorageClusterPeer(ctx, r.Client, logger, mirrorPeer)
if err != nil {
logger.Error("Failed to create StorageClusterPeer", "error", err)
return result, err
}
}
return r.updateMirrorPeerStatus(ctx, mirrorPeer)
}

func createStorageClusterPeer(ctx context.Context, client client.Client, logger *slog.Logger, mirrorPeer multiclusterv1alpha1.MirrorPeer) (ctrl.Result, error) {
logger = logger.With("MirrorPeer", mirrorPeer.Name)
clientInfoMap, err := fetchClientInfoConfigMap(ctx, client)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Client info config map not found. Retrying request another time...")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
items := mirrorPeer.Spec.Items
for _, item := range items {
clientInfo, err := getClientInfoFromConfigMap(clientInfoMap.Data, item.StorageClusterRef.Name)
if err != nil {
return ctrl.Result{}, err
}

storageClusterPeerName := getStorageClusterPeerName(items[0].StorageClusterRef.Name, items[1].StorageClusterRef.Name)
storageClusterPeer := ocsv1.StorageClusterPeer{
ObjectMeta: metav1.ObjectMeta{
Name: storageClusterPeerName,
Namespace: clientInfo.ProviderInfo.NamespacedName.Namespace,
},
Spec: ocsv1.StorageClusterPeerSpec{
APIServerEndpoint: clientInfo.ProviderInfo.StorageProviderEndpoint,
OnboardingTicket: fetchOnboardingTicket(clientInfo),
},
}
storageClusterPeerJson, err := json.Marshal(storageClusterPeer)
if err != nil {
logger.Error("Failed to marshal StorageClusterPeer to JSON", "StorageClusterPeer", storageClusterPeerName)
return ctrl.Result{}, err
}

ownerRef := metav1.OwnerReference{
APIVersion: mirrorPeer.APIVersion,
Kind: mirrorPeer.Kind,
Name: mirrorPeer.Name,
UID: mirrorPeer.UID,
}

manifestWorkName := fmt.Sprintf("storageclusterpeer-%s", clientInfo.Name)
namespace := clientInfo.ProviderInfo.ProviderManagedClusterName

operationResult, err := utils.CreateOrUpdateManifestWork(ctx, client, manifestWorkName, namespace, storageClusterPeerJson, ownerRef)
if err != nil {
return ctrl.Result{}, err
}

logger.Info(fmt.Sprintf("ManifestWork was %s for StorageClusterPeer %s", operationResult, storageClusterPeerName))
}

return ctrl.Result{}, nil
}

func fetchOnboardingTicket(clientInfo ClientInfo) string {
panic("unimplemented")
}

func getStorageClusterPeerName(c1, c2 string) string {
return fmt.Sprintf("%s-%s-peer", c1, c2)
}

func fetchClientInfoConfigMap(ctx context.Context, c client.Client) (*corev1.ConfigMap, error) {
currentNamespace := os.Getenv("POD_NAMESPACE")
if currentNamespace == "" {
return nil, fmt.Errorf("cannot detect the current namespace")
}
clientInfoMap, err := utils.FetchConfigMap(ctx, c, ClientInfoConfigMapName, currentNamespace)
if err != nil {
return nil, err
}
return clientInfoMap, nil
}

type ManagedClusterAddonConfig struct {
// Namespace on the managedCluster where it will be deployed
InstallNamespace string

// Name of the MCA
Name string

// Namespace on the hub where MCA will be created, it represents the Managed cluster where the addons will be deployed
Namespace string
}

// Helper function to extract and unmarshal ClientInfo from ConfigMap
func getClientInfoFromConfigMap(clientInfoMap map[string]string, clientName string) (ClientInfo, error) {
clientInfoJSON, ok := clientInfoMap[clientName]
if !ok {
return ClientInfo{}, fmt.Errorf("client info for %s not found in ConfigMap", clientName)
}

var clientInfo ClientInfo
if err := json.Unmarshal([]byte(clientInfoJSON), &clientInfo); err != nil {
return ClientInfo{}, fmt.Errorf("failed to unmarshal client info for %s: %v", clientName, err)
}

return clientInfo, nil
}

func getConfig(ctx context.Context, c client.Client, mp multiclusterv1alpha1.MirrorPeer) ([]ManagedClusterAddonConfig, error) {
managedClusterAddonsConfig := make([]ManagedClusterAddonConfig, 0)
if utils.IsStorageClientType(mp.Spec.Items) {
clientInfoMap, err := fetchClientInfoConfigMap(ctx, c)
if err != nil {
return []ManagedClusterAddonConfig{}, err
}
for _, item := range mp.Spec.Items {
clientName := item.ClusterName
clientInfo, err := getClientInfoFromConfigMap(clientInfoMap.Data, clientName)
if err != nil {
return []ManagedClusterAddonConfig{}, err
}
config := ManagedClusterAddonConfig{
Name: setup.TokenExchangeName,
Namespace: clientInfo.ProviderInfo.ProviderManagedClusterName,
InstallNamespace: clientInfo.ProviderInfo.NamespacedName.Namespace,
}
managedClusterAddonsConfig = append(managedClusterAddonsConfig, config)
}
} else {
for _, item := range mp.Spec.Items {
managedClusterAddonsConfig = append(managedClusterAddonsConfig, ManagedClusterAddonConfig{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
InstallNamespace: item.StorageClusterRef.Namespace,
})
}
}
return managedClusterAddonsConfig, nil
}

// processManagedClusterAddon creates an addon for the cluster management in all the peer refs,
// the resources gets an owner ref of the mirrorpeer to let the garbage collector handle it if the mirrorpeer gets deleted
func (r *MirrorPeerReconciler) processManagedClusterAddon(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer) error {
logger := r.Logger.With("MirrorPeer", mirrorPeer.Name)
logger.Info("Processing ManagedClusterAddons for MirrorPeer")

for _, item := range mirrorPeer.Spec.Items {
logger.Info("Handling ManagedClusterAddon for cluster", "ClusterName", item.ClusterName)
addonConfigs, err := getConfig(ctx, r.Client, mirrorPeer)
if err != nil {
return fmt.Errorf("failed to get managedclusteraddon config %w", err)
}
for _, config := range addonConfigs {
logger.Info("Handling ManagedClusterAddon for cluster", "ClusterName", config.Namespace)

var managedClusterAddOn addonapiv1alpha1.ManagedClusterAddOn
namespacedName := types.NamespacedName{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
Name: config.Name,
Namespace: config.Namespace,
}

err := r.Client.Get(ctx, namespacedName, &managedClusterAddOn)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("ManagedClusterAddon not found, will create a new one", "ClusterName", item.ClusterName)
logger.Info("ManagedClusterAddon not found, will create a new one", "ClusterName", config.Namespace)

annotations := make(map[string]string)
annotations[utils.DRModeAnnotationKey] = string(mirrorPeer.Spec.Type)
managedClusterAddOn = addonapiv1alpha1.ManagedClusterAddOn{
ObjectMeta: metav1.ObjectMeta{
Name: setup.TokenExchangeName,
Namespace: item.ClusterName,
Namespace: config.Namespace,
Annotations: annotations,
},
}
}
}

_, err = controllerutil.CreateOrUpdate(ctx, r.Client, &managedClusterAddOn, func() error {
managedClusterAddOn.Spec.InstallNamespace = item.StorageClusterRef.Namespace
managedClusterAddOn.Spec.InstallNamespace = config.InstallNamespace
if err := controllerutil.SetOwnerReference(&mirrorPeer, &managedClusterAddOn, r.Scheme); err != nil {
logger.Error("Failed to set owner reference on ManagedClusterAddon", "error", err, "ClusterName", item.ClusterName)
logger.Error("Failed to set owner reference on ManagedClusterAddon", "error", err, "ClusterName", config.Namespace)
return err
}
return nil
Expand All @@ -289,7 +456,7 @@ func (r *MirrorPeerReconciler) processManagedClusterAddon(ctx context.Context, m
return err
}

logger.Info("Successfully reconciled ManagedClusterAddOn", "ClusterName", item.ClusterName)
logger.Info("Successfully reconciled ManagedClusterAddOn", "ClusterName", config.Namespace)
}

logger.Info("Successfully processed all ManagedClusterAddons for MirrorPeer")
Expand Down
22 changes: 22 additions & 0 deletions controllers/utils/configmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package utils

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// FetchConfigMap fetches a ConfigMap with a given name from a given namespace
func FetchConfigMap(ctx context.Context, c client.Client, name, namespace string) (*corev1.ConfigMap, error) {
configMap := &corev1.ConfigMap{}
err := c.Get(ctx, client.ObjectKey{
Name: name,
Namespace: namespace,
}, configMap)
if err != nil {
return nil, fmt.Errorf("failed to fetch ConfigMap %s in namespace %s: %v", name, namespace, err)
}
return configMap, nil
}
45 changes: 45 additions & 0 deletions controllers/utils/manifestwork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package utils

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
workv1 "open-cluster-management.io/api/work/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func CreateOrUpdateManifestWork(ctx context.Context, c client.Client, name string, namespace string, objJson []byte, ownerRef metav1.OwnerReference) (controllerutil.OperationResult, error) {
mw := workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
ownerRef,
},
},
}

operationResult, err := controllerutil.CreateOrUpdate(ctx, c, &mw, func() error {
mw.Spec = workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Manifests: []workv1.Manifest{
{
RawExtension: runtime.RawExtension{
Raw: objJson,
},
},
},
},
}
return nil
})

if err != nil {
return operationResult, fmt.Errorf("failed to create and update ManifestWork %s for namespace %s. error %w", name, namespace, err)
}

return operationResult, nil
}
7 changes: 7 additions & 0 deletions controllers/utils/peer_ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ func GetPeerRefForSpokeCluster(mp *multiclusterv1alpha1.MirrorPeer, spokeCluster
}
return nil, fmt.Errorf("PeerRef for cluster %s under mirrorpeer %s not found", spokeClusterName, mp.Name)
}

func IsStorageClientType(peerRefs []multiclusterv1alpha1.PeerRef) bool {
if peerRefs[0].StorageClusterRef.Namespace == "" && peerRefs[1].StorageClusterRef.Namespace == "" {
return true
}
return false
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ replace (
github.com/openshift/hive => github.com/openshift/hive v1.1.17-0.20220223000051-b1c8fa5853b1
github.com/openshift/hive/apis => github.com/openshift/hive/apis v0.0.0-20220221165319-b389a65758da
github.com/portworx/sched-ops => github.com/portworx/sched-ops v0.20.4-openstorage-rc3
github.com/red-hat-storage/ocs-operator/api/v4 => github.com/rewantsoni/ocs-operator/api/v4 v4.0.0-20240701052137-de69df292a5d
github.com/terraform-providers/terraform-provider-aws => github.com/openshift/terraform-provider-aws v1.60.1-0.20211215220004-24df6d73af46
github.com/terraform-providers/terraform-provider-ignition/v2 => github.com/community-terraform-providers/terraform-provider-ignition/v2 v2.1.0
sigs.k8s.io/cluster-api-provider-aws => github.com/openshift/cluster-api-provider-aws v0.2.1-0.20210121023454-5ffc5f422a80
Expand Down
7 changes: 3 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,8 @@ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg78
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
Expand Down Expand Up @@ -711,8 +710,8 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/ramendr/ramen/api v0.0.0-20240409201920-10024cae3bfd h1:TsDaQqfb1BcR78JWXBmUyj6Qx4By5loUZ95CxmA/6zo=
github.com/ramendr/ramen/api v0.0.0-20240409201920-10024cae3bfd/go.mod h1:PCb0ODjdi4eYuxY/nSw+/rQqmzkmBVqGNoDr2JXdlKE=
github.com/red-hat-storage/ocs-operator/api/v4 v4.0.0-20240327160100-bbe9d9d49462 h1:84M7EBnmBISt2LcoyYPWsw+A3/7BGXp6Mh3sjUH5vIg=
github.com/red-hat-storage/ocs-operator/api/v4 v4.0.0-20240327160100-bbe9d9d49462/go.mod h1:uySjux/lY0DpC+VXof4ly2SlS7QUocTm2CH4sU8ICeg=
github.com/rewantsoni/ocs-operator/api/v4 v4.0.0-20240701052137-de69df292a5d h1:zXBbu5hpZmW4i3heppui4pqbvubZF/WsBiuP6ZekNKE=
github.com/rewantsoni/ocs-operator/api/v4 v4.0.0-20240701052137-de69df292a5d/go.mod h1:kbtILVV15bhm4UFehDYhezjZIvbeaZQ/4vQdv2gagdk=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
Expand Down

0 comments on commit d3f909d

Please sign in to comment.