Skip to content

Commit

Permalink
Enhance MirrorPeer and S3 Secret Handling and other bug fixes
Browse files Browse the repository at this point in the history
- Updated MirrorPeerReconciler to accurately handle storage client references and added deleteMirrorPeer conditional to account for provider mode.
- Introduced annotation support in ObjectBucketClaim for dynamic setting based on MirrorPeer type.
- Added new constants and enums for OBC types and annotations to enhance readability and standardize key usage.
- Enhanced S3 secret synchronization logic to differentiate between client and cluster references, fetching relevant storage cluster info per context.
- Refined ConfigMap client information updates with new GetKey helper and corrected test cases for compatibility.
- Implemented FetchMirrorPeerByName for efficient MirrorPeer retrieval by name.
- Modified utility methods for secret and config map creation to handle namespace variations, ensuring correct retrieval in provider-client contexts.
- Refactored validation and utility methods for enhanced error handling and consistent namespace management.

Signed-off-by: vbadrina <[email protected]>
  • Loading branch information
vbnrh committed Nov 20, 2024
1 parent 4a516d1 commit a3f1f26
Show file tree
Hide file tree
Showing 22 changed files with 772 additions and 231 deletions.
90 changes: 52 additions & 38 deletions addons/agent_mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package addons

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -71,12 +71,34 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

scr, err := utils.GetCurrentStorageClusterRef(&mirrorPeer, r.SpokeClusterName)
hasStorageClientRef, err := utils.IsStorageClientType(ctx, r.SpokeClient, mirrorPeer, true)
logger.Info("MirrorPeer has client reference?", "True/False", hasStorageClientRef)

if err != nil {
logger.Error("Failed to get current storage cluster ref", "error", err)
logger.Error("Failed to check if storage client ref exists", "error", err)
return ctrl.Result{}, err
}

var scr *multiclusterv1alpha1.StorageClusterRef
if hasStorageClientRef {
currentNamespace := os.Getenv("POD_NAMESPACE")
sc, err := utils.GetStorageClusterFromCurrentNamespace(ctx, r.SpokeClient, currentNamespace)
if err != nil {
logger.Error("Failed to fetch StorageCluster for given namespace", "Namespace", currentNamespace)
return ctrl.Result{}, err
}
scr = &multiclusterv1alpha1.StorageClusterRef{
Name: sc.Name,
Namespace: sc.Namespace,
}
} else {
scr, err = utils.GetCurrentStorageClusterRef(&mirrorPeer, r.SpokeClusterName)
if err != nil {
logger.Error("Failed to get current storage cluster ref", "error", err)
return ctrl.Result{}, err
}
}

agentFinalizer := r.SpokeClusterName + "." + SpokeMirrorPeerFinalizer
if len(agentFinalizer) > 63 {
agentFinalizer = fmt.Sprintf("%s.%s", r.SpokeClusterName[0:10], SpokeMirrorPeerFinalizer)
Expand All @@ -92,10 +114,13 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}
} else {
result, err := r.deleteMirrorPeer(ctx, mirrorPeer, scr)
if err != nil {
return result, err
}
if !hasStorageClientRef {
result, err := r.deleteMirrorPeer(ctx, mirrorPeer, scr)
if err != nil {
return result, err
}
} // TODO Write complete deletion for Provider mode mirrorpeer

err = r.HubClient.Get(ctx, req.NamespacedName, &mirrorPeer)
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -115,13 +140,6 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

hasStorageClientRef, err := utils.IsStorageClientType(ctx, r.SpokeClient, mirrorPeer, true)

if err != nil {
logger.Error("Failed to check if storage client ref exists", "error", err)
return ctrl.Result{}, err
}

logger.Info("Creating S3 buckets")
err = r.createS3(ctx, mirrorPeer, scr.Namespace, hasStorageClientRef)
if err != nil {
Expand Down Expand Up @@ -179,16 +197,13 @@ func (r *MirrorPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
if err == nil {
type OnboardingTicket struct {
ID string `json:"id"`
ExpirationDate int64 `json:"expirationDate,string"`
StorageQuotaInGiB uint `json:"storageQuotaInGiB,omitempty"`
}
var ticketData OnboardingTicket
err = json.Unmarshal(token.Data["storagecluster-peer-token"], &ticketData)
logger.Info("Trying to unmarshal onboarding token.")
ticketData, err := UnmarshalOnboardingToken(&token)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to unmarshal onboarding ticket message. %w", err)
logger.Error("Failed to unmarshal the onboarding ticket data")
return ctrl.Result{}, err
}
logger.Info("Successfully unmarshalled onboarding ticket", "ticketData", ticketData)
if ticketData.ExpirationDate > time.Now().Unix() {
logger.Info("Onboarding token has not expired yet. Not renewing it.", "Token", token.Name, "ExpirationDate", ticketData.ExpirationDate)
return ctrl.Result{}, nil
Expand Down Expand Up @@ -314,24 +329,23 @@ func (r *MirrorPeerReconciler) labelRBDStorageClasses(ctx context.Context, stora
}

func (r *MirrorPeerReconciler) createS3(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, scNamespace string, hasStorageClientRef bool) error {
bucketCount := 1
bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace)
bucketName := utils.GenerateBucketName(mirrorPeer, hasStorageClientRef)
annotations := map[string]string{
utils.MirrorPeerNameAnnotationKey: mirrorPeer.Name,
}
if hasStorageClientRef {
bucketCount = 2
annotations[OBCTypeAnnotationKey] = string(CLIENT)

} else {
annotations[OBCTypeAnnotationKey] = string(CLUSTER)
}
for index := 0; index < bucketCount; index++ {
bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace)
var bucketName string
if hasStorageClientRef {
bucketName = utils.GenerateBucketName(mirrorPeer, mirrorPeer.Spec.Items[index].StorageClusterRef.Name)
} else {
bucketName = utils.GenerateBucketName(mirrorPeer)
}
operationResult, err := utils.CreateOrUpdateObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace)
if err != nil {
return err
}
r.Logger.Info(fmt.Sprintf("ObjectBucketClaim %s was %s in namespace %s", bucketName, operationResult, bucketNamespace))

operationResult, err := utils.CreateOrUpdateObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace, annotations)
if err != nil {
return err
}
r.Logger.Info(fmt.Sprintf("ObjectBucketClaim %s was %s in namespace %s", bucketName, operationResult, bucketNamespace))

return nil
}
Expand Down Expand Up @@ -519,7 +533,7 @@ func (r *MirrorPeerReconciler) deleteGreenSecret(ctx context.Context, spokeClust
// deleteS3 deletes the S3 bucket in the storage cluster namespace, each new mirrorpeer generates
// a new bucket, so we do not need to check if the bucket is being used by another mirrorpeer
func (r *MirrorPeerReconciler) deleteS3(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, scNamespace string) error {
bucketName := utils.GenerateBucketName(mirrorPeer)
bucketName := utils.GenerateBucketName(mirrorPeer, false)
bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace)
noobaaOBC, err := utils.GetObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions addons/agent_mirrorpeer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package addons
import (
"context"
"fmt"
"os"
"testing"

"github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1"
Expand Down Expand Up @@ -126,6 +127,7 @@ func TestMirrorPeerReconcile(t *testing.T) {
ctx := context.TODO()
scheme := mgrScheme
fakeHubClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&mirrorpeer1, &mirrorpeer2).Build()
os.Setenv("POD_NAMESPACE", "test-namespace")
oppositePeerRefsArray := make([][]multiclusterv1alpha1.PeerRef, 0)
// Quick iteration to get peer refs
for _, pr := range mirrorpeer1.Spec.Items {
Expand Down Expand Up @@ -225,7 +227,7 @@ func TestDisableMirroring(t *testing.T) {
},
}

fakeSpokeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&storageCluster).Build()
fakeSpokeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&storageCluster, &odfInfoConfigMap).Build()
r := MirrorPeerReconciler{
HubClient: fakeHubClient,
SpokeClient: fakeSpokeClient,
Expand Down Expand Up @@ -304,7 +306,7 @@ func TestDeleteGreenSecret(t *testing.T) {
}

func TestDeleteS3(t *testing.T) {
bucketName := utils.GenerateBucketName(mirrorPeer)
bucketName := utils.GenerateBucketName(mirrorPeer, false)
ctx := context.TODO()
scheme := mgrScheme
fakeHubClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&mirrorpeer1).Build()
Expand Down
9 changes: 9 additions & 0 deletions addons/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package addons

type OBCTypeValue string

const (
RBDProvisionerTemplate = "%s.rbd.csi.ceph.com"
MaintenanceModeFinalizer = "maintenance.multicluster.odf.openshift.io"
Expand All @@ -10,4 +12,11 @@ const (
StorageIDKey = "storageid"
CephFSProvisionerTemplate = "%s.cephfs.csi.ceph.com"
SpokeMirrorPeerFinalizer = "spoke.multicluster.odf.openshift.io"
OBCTypeAnnotationKey = "multicluster.odf.openshift.io/obc-type"
OBCNameAnnotationKey = "multicluster.odf.openshift.io/obc-name"
)

var (
CLIENT OBCTypeValue = "client"
CLUSTER OBCTypeValue = "cluster"
)
126 changes: 84 additions & 42 deletions addons/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
runtimewait "k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -214,6 +216,43 @@ func runHubManager(ctx context.Context, options AddonAgentOptions, logger *slog.
os.Exit(1)
}
}
func isProviderModeEnabled(ctx context.Context, cl client.Reader, namespace string, logger *slog.Logger) bool {
storageClusterCRD := &metav1.PartialObjectMetadata{}
storageClusterCRD.SetGroupVersionKind(
extv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
)
storageClusterCRD.Name = "storageclusters.ocs.openshift.io"
if err := cl.Get(ctx, client.ObjectKeyFromObject(storageClusterCRD), storageClusterCRD); client.IgnoreNotFound(err) != nil {
logger.Error("Failed to find presence of StorageCluster CRD", "Error", err)
return false
}

if storageClusterCRD.UID != "" {
storageClusters := &metav1.PartialObjectMetadataList{}
storageClusters.SetGroupVersionKind(
schema.GroupVersionKind{
Group: "ocs.openshift.io",
Version: "v1",
Kind: "StorageCluster",
},
)
if err := cl.List(ctx, storageClusters, client.InNamespace(namespace), client.Limit(1)); err != nil {
logger.Error("Failed to list StorageCluster CR")
return false
}
if len(storageClusters.Items) < 1 {
logger.Error("StorageCluster CR does not exist")
return false
}
logger.Info("Checking if StorageCluster indicates ODF is deployed in provider mode")
if storageClusters.Items[0].GetAnnotations()["ocs.openshift.io/deployment-mode"] != "provider" {
return false
}
}

logger.Info("Conditions not met. Controllers will be skipped.")
return true
}

func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slog.Logger) {
hubConfig, err := GetClientConfig(options.HubKubeconfigFile)
Expand Down Expand Up @@ -251,40 +290,54 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo

currentNamespace := os.Getenv("POD_NAMESPACE")

if err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
logger.Info("Waiting for MaintenanceMode CRD to be established. MaintenanceMode controller is not running yet.")
// Wait for 45s as it takes time for MaintenanceMode CRD to be created.
return runtimewait.PollUntilContextCancel(ctx, 15*time.Second, true,
func(ctx context.Context) (done bool, err error) {
var crd extv1.CustomResourceDefinition
readErr := mgr.GetAPIReader().Get(ctx, types.NamespacedName{Name: "maintenancemodes.ramendr.openshift.io"}, &crd)
if readErr != nil {
logger.Error("Unable to get MaintenanceMode CRD", readErr)
// Do not initialize err as we want to retry.
// err!=nil or done==true will end polling.
done = false
return
}
if crd.Spec.Group == "ramendr.openshift.io" && crd.Spec.Names.Kind == "MaintenanceMode" {
if err = (&MaintenanceModeReconciler{
Scheme: mgr.GetScheme(),
SpokeClient: mgr.GetClient(),
SpokeClusterName: options.SpokeClusterName,
Logger: logger.With("controller", "MaintenanceModeReconciler"),
}).SetupWithManager(mgr); err != nil {
klog.Error("Unable to create MaintenanceMode controller.", err)
if !isProviderModeEnabled(ctx, mgr.GetAPIReader(), currentNamespace, logger) {
logger.Info("Cluster is not running in provider mode. Setting up blue and maintenance mode controllers")
if err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
logger.Info("Waiting for MaintenanceMode CRD to be established. MaintenanceMode controller is not running yet.")
// Wait for 45s as it takes time for MaintenanceMode CRD to be created.
return runtimewait.PollUntilContextCancel(ctx, 15*time.Second, true,
func(ctx context.Context) (done bool, err error) {
var crd extv1.CustomResourceDefinition
readErr := mgr.GetAPIReader().Get(ctx, types.NamespacedName{Name: "maintenancemodes.ramendr.openshift.io"}, &crd)
if readErr != nil {
logger.Error("Unable to get MaintenanceMode CRD", "Error", readErr)
// Do not initialize err as we want to retry.
// err!=nil or done==true will end polling.
done = false
return
}
if crd.Spec.Group == "ramendr.openshift.io" && crd.Spec.Names.Kind == "MaintenanceMode" {
if err = (&MaintenanceModeReconciler{
Scheme: mgr.GetScheme(),
SpokeClient: mgr.GetClient(),
SpokeClusterName: options.SpokeClusterName,
Logger: logger.With("controller", "MaintenanceModeReconciler"),
}).SetupWithManager(mgr); err != nil {
klog.Error("Unable to create MaintenanceMode controller.", err)
return
}
logger.Info("MaintenanceMode CRD exists and controller is now running")
done = true
return
}
logger.Info("MaintenanceMode CRD exists and controller is now running")
done = true
done = false
return
}
done = false
return
})
})); err != nil {
logger.Error("Failed to poll MaintenanceMode", "error", err)
os.Exit(1)
})
})); err != nil {
logger.Error("Failed to poll MaintenanceMode", "error", err)
os.Exit(1)
}

if err = (&BlueSecretReconciler{
Scheme: mgr.GetScheme(),
HubClient: hubClient,
SpokeClient: mgr.GetClient(),
SpokeClusterName: options.SpokeClusterName,
Logger: logger.With("controller", "BlueSecretReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create BlueSecret controller", "controller", "BlueSecret", "error", err)
os.Exit(1)
}
}

if err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
Expand All @@ -302,17 +355,6 @@ func runSpokeManager(ctx context.Context, options AddonAgentOptions, logger *slo
os.Exit(1)
}

if err = (&BlueSecretReconciler{
Scheme: mgr.GetScheme(),
HubClient: hubClient,
SpokeClient: mgr.GetClient(),
SpokeClusterName: options.SpokeClusterName,
Logger: logger.With("controller", "BlueSecretReconciler"),
}).SetupWithManager(mgr); err != nil {
logger.Error("Failed to create BlueSecret controller", "controller", "BlueSecret", "error", err)
os.Exit(1)
}

if err = (&S3SecretReconciler{
Scheme: mgr.GetScheme(),
HubClient: hubClient,
Expand Down
Loading

0 comments on commit a3f1f26

Please sign in to comment.