Skip to content

Commit

Permalink
Create additional storageclasses for cephfs if defined in storagecluster
Browse files Browse the repository at this point in the history
If the user has defined additional storageclasses for cephfs in the
storagecluster, then we create the additional storageclasses for cephfs.
For each of these we create a data pool, a cephfssvg. In the
storageclass we mention the pool name & the clusterID is set from the
status of the corresponding cephfssvg. The code for creation of
svgs have been refactored to make it simpler.

Signed-off-by: Malay Kumar Parida <[email protected]>
  • Loading branch information
malayparida2000 committed Mar 6, 2024
1 parent f97b851 commit 9ab1e2f
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 86 deletions.
226 changes: 144 additions & 82 deletions controllers/storagecluster/cephfilesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storagecluster
import (
"context"
"fmt"
"reflect"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"github.com/red-hat-storage/ocs-operator/v4/controllers/defaults"
Expand All @@ -11,7 +12,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -47,19 +47,46 @@ func (r *StorageClusterReconciler) newCephFilesystemInstances(initStorageCluster
}

// not in provider mode
// standalone deployment that isn't in provider cluster will not have storageProfile
// we need to define default dataPool, if storageProfile is set this will be overridden.
if !initStorageCluster.Spec.AllowRemoteStorageConsumers {
// standalone deployment that isn't in provider cluster will not
// have storageProfile, we need to define default dataPool, if
// storageProfile is set this will be overridden.
ret.Spec.DataPools = []cephv1.NamedPoolSpec{
{
PoolSpec: cephv1.PoolSpec{
DeviceClass: generateDeviceClass(initStorageCluster),
Replicated: generateCephReplicatedSpec(initStorageCluster, "data"),
FailureDomain: initStorageCluster.Status.FailureDomain,
},
PoolSpec: initStorageCluster.Spec.ManagedResources.CephFilesystems.DataPoolSpec,
},
}
// Append additional pools from specified additional storage classes
for _, storageClass := range initStorageCluster.Spec.ManagedResources.CephFilesystems.AdditionalCephFilesystemStorageClasses {
ret.Spec.DataPools = append(ret.Spec.DataPools, cephv1.NamedPoolSpec{
PoolSpec: storageClass.DataPoolSpec,
})
}

// Iterate over each pool specification and set default values if necessary
defaultPoolSpec := generateDefaultPoolSpec(initStorageCluster)
for i := range ret.Spec.DataPools {
pool := &ret.Spec.DataPools[i] // Get a pointer to the pool spec
// Set default device class if not specified
if pool.PoolSpec.DeviceClass == "" {
pool.PoolSpec.DeviceClass = defaultPoolSpec.DeviceClass
}
// Set default replication settings if not specified
if reflect.DeepEqual(pool.PoolSpec.Replicated, cephv1.ReplicatedSpec{}) {
if pool.PoolSpec.Replicated.Size == 0 {
pool.PoolSpec.Replicated.Size = defaultPoolSpec.Replicated.Size
}
if pool.PoolSpec.Replicated.ReplicasPerFailureDomain == 0 {
pool.PoolSpec.Replicated.ReplicasPerFailureDomain = defaultPoolSpec.Replicated.ReplicasPerFailureDomain
}
if pool.PoolSpec.Replicated.TargetSizeRatio == 0 {
pool.PoolSpec.Replicated.TargetSizeRatio = defaultPoolSpec.Replicated.TargetSizeRatio
}
}
// Set default failure domain if not specified
if pool.PoolSpec.FailureDomain == "" {
pool.PoolSpec.FailureDomain = defaultPoolSpec.FailureDomain
}
}
} else {
// Load all StorageProfile objects in the StorageCluster's namespace
storageProfiles := &ocsv1.StorageProfileList{}
Expand Down Expand Up @@ -108,6 +135,40 @@ func (r *StorageClusterReconciler) newCephFilesystemInstances(initStorageCluster
return []*cephv1.CephFilesystem{ret}, nil
}

// getSubVolumeGroupsForFilesystem returns the subVolumeGroups for the given filesystem
func getSubVolumeGroupsForFilesystem(filesystem *cephv1.CephFilesystem) []cephv1.CephFilesystemSubVolumeGroup {
// Default value of "distributed" option for pinning in the CephFilesystemSubVolumeGroup CR
var defaultPinningValue = 1
subVolumeGroupSpec := cephv1.CephFilesystemSubVolumeGroupSpec{
FilesystemName: filesystem.Name,
Pinning: cephv1.CephFilesystemSubVolumeGroupSpecPinning{
Distributed: &defaultPinningValue,
},
}
defaultSubVolumeGroup := cephv1.CephFilesystemSubVolumeGroup{
ObjectMeta: metav1.ObjectMeta{
Name: defaultSubvolumeGroupName,
Namespace: filesystem.Namespace,
},
Spec: subVolumeGroupSpec,
}
subVolumeGroups := []cephv1.CephFilesystemSubVolumeGroup{defaultSubVolumeGroup}
for i := range filesystem.Spec.DataPools {
if i == 0 {
continue // Skip the first data pool as for it the default svg is already added
}
svg := cephv1.CephFilesystemSubVolumeGroup{
ObjectMeta: metav1.ObjectMeta{
Name: generateNameForAdditionalCephFilesystemStorageClassSubvolumeGroup(filesystem.Name, i),
Namespace: filesystem.Namespace,
},
Spec: subVolumeGroupSpec,
}
subVolumeGroups = append(subVolumeGroups, svg)
}
return subVolumeGroups
}

// ensureCreated ensures that cephFilesystem resources exist in the desired
// state.
func (obj *ocsCephFilesystems) ensureCreated(r *StorageClusterReconciler, instance *ocsv1.StorageCluster) (reconcile.Result, error) {
Expand Down Expand Up @@ -149,10 +210,10 @@ func (obj *ocsCephFilesystems) ensureCreated(r *StorageClusterReconciler, instan
return reconcile.Result{}, err
}
}
// create default csi subvolumegroup for the filesystem
// create subVolumeGroups for the filesystem
// skip for the ocs provider mode
if !instance.Spec.AllowRemoteStorageConsumers {
err = r.createDefaultSubvolumeGroup(cephFilesystem.Name, cephFilesystem.Namespace, cephFilesystem.ObjectMeta.OwnerReferences)
err = r.ensureCreateCephFilesystemSubVolumeGroups(cephFilesystem)
if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -162,79 +223,39 @@ func (obj *ocsCephFilesystems) ensureCreated(r *StorageClusterReconciler, instan
return reconcile.Result{}, nil
}

func (r *StorageClusterReconciler) createDefaultSubvolumeGroup(filesystemName, filesystemNamespace string, ownerReferences []metav1.OwnerReference) error {

existingsvg := &cephv1.CephFilesystemSubVolumeGroup{}
svgName := generateNameForCephSubvolumeGroup(filesystemName)
err := r.Client.Get(r.ctx, types.NamespacedName{Name: svgName, Namespace: filesystemNamespace}, existingsvg)
if err == nil {
if existingsvg.DeletionTimestamp != nil {
r.Log.Info("Unable to restore subvolumegroup because it is marked for deletion.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return fmt.Errorf("failed to restore subvolumegroup %s because it is marked for deletion", existingsvg.Name)
}
}

cephFilesystemSubVolumeGroup := &cephv1.CephFilesystemSubVolumeGroup{
ObjectMeta: metav1.ObjectMeta{
Name: svgName,
Namespace: filesystemNamespace,
OwnerReferences: ownerReferences,
},
}

// Default value of "distributed" option for pinning in the CephFilesystemSubVolumeGroup CR
defaultPinningValue := 1
mutateFn := func() error {
cephFilesystemSubVolumeGroup.Spec = cephv1.CephFilesystemSubVolumeGroupSpec{
Name: defaultSubvolumeGroupName,
FilesystemName: filesystemName,
Pinning: cephv1.CephFilesystemSubVolumeGroupSpecPinning{
Distributed: &defaultPinningValue,
},
func (r *StorageClusterReconciler) ensureCreateCephFilesystemSubVolumeGroups(filesystem *cephv1.CephFilesystem) error {
subVolumeGroups := getSubVolumeGroupsForFilesystem(filesystem)
for _, subVolumeGroup := range subVolumeGroups {
existing := cephv1.CephFilesystemSubVolumeGroup{}
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: subVolumeGroup.Name, Namespace: subVolumeGroup.Namespace}, &existing)
switch {
case err == nil:
if existing.DeletionTimestamp != nil {
r.Log.Info("Unable to restore CephFileSystemSubVolumeGroup because it is marked for deletion.", "CephFileSystemSubVolumeGroup", klog.KRef(existing.Namespace, existing.Name))
return fmt.Errorf("failed to restore initialization object %s because it is marked for deletion", existing.Name)
}
if !reflect.DeepEqual(existing.Spec, subVolumeGroup.Spec) || !reflect.DeepEqual(existing.ObjectMeta.OwnerReferences, subVolumeGroup.ObjectMeta.OwnerReferences) {
r.Log.Info("Restoring original CephFilesystemSubVolumeGroup.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
existing.ObjectMeta.OwnerReferences = subVolumeGroup.ObjectMeta.OwnerReferences
existing.Spec = subVolumeGroup.Spec
err = r.Client.Update(context.TODO(), &existing)
if err != nil {
r.Log.Error(err, "Unable to update CephFileSystemSubVolumeGroup.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
return err
}
}
case errors.IsNotFound(err):
r.Log.Info("Creating CephFileSystemSubVolumeGroup.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
err = r.Client.Create(context.TODO(), &subVolumeGroup)
if err != nil {
r.Log.Error(err, "Unable to create CephFileSystemSubVolumeGroup.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
return err
}
}
return nil
}
_, err = ctrl.CreateOrUpdate(r.ctx, r.Client, cephFilesystemSubVolumeGroup, mutateFn)
if err != nil {
r.Log.Error(err, "Could not create/update default csi cephFilesystemSubVolumeGroup.", "cephFilesystemSubVolumeGroup", klog.KRef(cephFilesystemSubVolumeGroup.Namespace, cephFilesystemSubVolumeGroup.Name))
return err
}
return nil
}

func (r *StorageClusterReconciler) deleteDefaultSubvolumeGroup(filesystemName, filesystemNamespace string) error {
existingsvg := &cephv1.CephFilesystemSubVolumeGroup{}
svgName := generateNameForCephSubvolumeGroup(filesystemName)
err := r.Client.Get(r.ctx, types.NamespacedName{Name: svgName, Namespace: filesystemNamespace}, existingsvg)
if err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Uninstall: csi subvolumegroup not found.", "Subvolumegroup", klog.KRef(filesystemNamespace, svgName))
return nil
}
r.Log.Error(err, "Uninstall: Unable to retrieve subvolumegroup.", "subvolumegroup", klog.KRef(filesystemNamespace, svgName))
return fmt.Errorf("uninstall: Unable to retrieve csi subvolumegroup : %v", err)
}

if existingsvg.GetDeletionTimestamp().IsZero() {
r.Log.Info("Uninstall: Deleting subvolumegroup.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
err = r.Client.Delete(r.ctx, existingsvg)
if err != nil {
r.Log.Error(err, "Uninstall: Failed to delete subvolumegroup.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return fmt.Errorf("uninstall: Failed to delete subvolumegroup %v: %v", existingsvg.Name, err)
}
}

err = r.Client.Get(r.ctx, types.NamespacedName{Name: svgName, Namespace: filesystemNamespace}, existingsvg)
if err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Uninstall: subvolumegroup is deleted.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return nil
}
}
r.Log.Error(err, "Uninstall: Waiting for subvolumegroup to be deleted.", "subvolumegroup", klog.KRef(filesystemNamespace, existingsvg.Name))
return fmt.Errorf("uninstall: Waiting for subvolumegroup %v to be deleted", existingsvg.Name)
}

// ensureDeleted deletes the CephFilesystems owned by the StorageCluster
func (obj *ocsCephFilesystems) ensureDeleted(r *StorageClusterReconciler, sc *ocsv1.StorageCluster) (reconcile.Result, error) {
foundCephFilesystem := &cephv1.CephFilesystem{}
Expand All @@ -257,10 +278,8 @@ func (obj *ocsCephFilesystems) ensureDeleted(r *StorageClusterReconciler, sc *oc
// delete csi subvolume group for particular filesystem
// skip for the ocs provider mode
if !sc.Spec.AllowRemoteStorageConsumers {
cephSVGName := generateNameForCephSubvolumeGroup(cephFilesystem.Name)
err = r.deleteDefaultSubvolumeGroup(cephFilesystem.Name, cephFilesystem.Namespace)
err = r.ensureDeleteCephFilesystemSubVolumeGroups(cephFilesystem)
if err != nil {
r.Log.Error(err, "Uninstall: unable to delete subvolumegroup", "subvolumegroup", klog.KRef(cephFilesystem.Namespace, cephSVGName))
return reconcile.Result{}, err
}
}
Expand All @@ -287,6 +306,40 @@ func (obj *ocsCephFilesystems) ensureDeleted(r *StorageClusterReconciler, sc *oc
return reconcile.Result{}, nil
}

func (r *StorageClusterReconciler) ensureDeleteCephFilesystemSubVolumeGroups(filesystem *cephv1.CephFilesystem) error {
subVolumeGroups := getSubVolumeGroupsForFilesystem(filesystem)
for _, subVolumeGroup := range subVolumeGroups {
existing := cephv1.CephFilesystemSubVolumeGroup{}
err := r.Client.Get(r.ctx, types.NamespacedName{Name: subVolumeGroup.Name, Namespace: subVolumeGroup.Namespace}, &existing)
if err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Uninstall: CephFileSystemSubVolumeGroup not found.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
continue
}
r.Log.Error(err, "Uninstall: Unable to retrieve CephFileSystemSubVolumeGroup.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
return fmt.Errorf("uninstall: Unable to retrieve CephFileSystemSubVolumeGroup %v: %v", subVolumeGroup.Name, err)
}
if subVolumeGroup.GetDeletionTimestamp().IsZero() {
r.Log.Info("Uninstall: Deleting cephFilesystemSubVolumeGroup.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
err = r.Client.Delete(r.ctx, &subVolumeGroup)
if err != nil {
r.Log.Error(err, "Uninstall: Failed to delete CephFileSystemSubVolumeGroup.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
return fmt.Errorf("uninstall: Failed to delete CephFileSystemSubVolumeGroup %v: %v", subVolumeGroup.Name, err)
}
}
err = r.Client.Get(r.ctx, types.NamespacedName{Name: subVolumeGroup.Name, Namespace: subVolumeGroup.Namespace}, &existing)
if err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Uninstall: CephFilesystemSubVolumeGroup is deleted.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
continue
}
}
r.Log.Error(err, "Uninstall: Waiting for CephFileSystemSubVolumeGroup to be deleted.", "CephFileSystemSubVolumeGroup", klog.KRef(subVolumeGroup.Namespace, subVolumeGroup.Name))
return fmt.Errorf("uninstall: Waiting for CephFileSystemSubVolumeGroup %v to be deleted", subVolumeGroup.Name)
}
return nil
}

func getActiveMetadataServers(sc *ocsv1.StorageCluster) int {
activeMds := sc.Spec.ManagedResources.CephFilesystems.ActiveMetadataServers
if activeMds != 0 {
Expand All @@ -295,3 +348,12 @@ func getActiveMetadataServers(sc *ocsv1.StorageCluster) int {

return defaults.CephFSActiveMetadataServers
}

// Define a function to generate default pool specifications
func generateDefaultPoolSpec(sc *ocsv1.StorageCluster) cephv1.PoolSpec {
return cephv1.PoolSpec{
DeviceClass: generateDeviceClass(sc),
Replicated: generateCephReplicatedSpec(sc, "data"),
FailureDomain: sc.Status.FailureDomain,
}
}
5 changes: 2 additions & 3 deletions controllers/storagecluster/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func generateStorageQuotaName(storageClassName, quotaName string) string {
return fmt.Sprintf("%s-%s", storageClassName, quotaName)
}

// generateNameForCephSubvolumeGroup function generates a name for CephFilesystemSubVolumeGroup
func generateNameForCephSubvolumeGroup(filesystemName string) string {
return fmt.Sprintf("%s-%s", filesystemName, defaultSubvolumeGroupName)
func generateNameForAdditionalCephFilesystemStorageClassSubvolumeGroup(filesystemName string, index int) string {
return fmt.Sprintf("%s-%s%d", filesystemName, "data", index)
}
2 changes: 1 addition & 1 deletion controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var validTopologyLabelKeys = []string{
}

// +kubebuilder:rbac:groups=ocs.openshift.io,resources=*,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=ceph.rook.io,resources=cephclusters;cephblockpools;cephfilesystems;cephnfses;cephobjectstores;cephobjectstoreusers;cephrbdmirrors;cephblockpoolradosnamespaces,verbs=*
// +kubebuilder:rbac:groups=ceph.rook.io,resources=cephclusters;cephblockpools;cephfilesystems;cephfilesystemsubvolumegroups;cephnfses;cephobjectstores;cephobjectstoreusers;cephrbdmirrors;cephblockpoolradosnamespaces,verbs=*
// +kubebuilder:rbac:groups=noobaa.io,resources=noobaas,verbs=*
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=*
// +kubebuilder:rbac:groups=core,resources=pods;services;serviceaccounts;endpoints;persistentvolumes;persistentvolumeclaims;events;configmaps;secrets;nodes,verbs=*
Expand Down
39 changes: 39 additions & 0 deletions controllers/storagecluster/storageclasses.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type StorageClassConfiguration struct {
reconcileStrategy ReconcileStrategy
disable bool
isClusterExternal bool
isProviderMode bool
}

type ocsStorageClass struct{}
Expand Down Expand Up @@ -184,6 +185,26 @@ func (r *StorageClusterReconciler) createStorageClasses(sccs []StorageClassConfi
skippedSC = append(skippedSC, sc.Name)
continue
}
// skip for the ocs provider mode
if !scc.isProviderMode {
subVolumeGroups := getSubVolumeGroupsForFilesystem(&cephFilesystem)
for _, subVolumeGroup := range subVolumeGroups {
cephFsSvg := cephv1.CephFilesystemSubVolumeGroup{}
err = r.Client.Get(context.TODO(), types.NamespacedName{Name: subVolumeGroup.Name, Namespace: subVolumeGroup.Namespace}, &cephFsSvg)
if err != nil || cephFsSvg.Status == nil || cephFsSvg.Status.Phase != cephv1.ConditionType(util.PhaseReady) {
r.Log.Info("Waiting for CephFilesystemSubVolumeGroup to be Ready. Skip reconciling StorageClass",
"CephFilesystemSubVolumeGroup", klog.KRef(key.Namespace, key.Name),
"StorageClass", klog.KRef("", sc.Name),
)
skippedSC = append(skippedSC, sc.Name)
continue
}
if sc.Parameters["pool"] == cephFsSvg.Name {
sc.Parameters["clusterID"] = cephFsSvg.Status.Info["clusterID"]

}
}
}
case strings.Contains(sc.Name, "-nfs") || strings.Contains(sc.Provisioner, nfsDriverName):
// wait for CephNFS to be ready
cephNFS := cephv1.CephNFS{}
Expand Down Expand Up @@ -273,7 +294,21 @@ func newCephFilesystemStorageClassConfiguration(initData *ocsv1.StorageCluster)
reconcileStrategy: ReconcileStrategy(managementSpec.ReconcileStrategy),
disable: managementSpec.DisableStorageClass,
isClusterExternal: initData.Spec.ExternalStorage.Enable,
isProviderMode: initData.Spec.AllowRemoteStorageConsumers,
}
}

func newAdditionalCephFilesystemStorageClassConfiguration(initData *ocsv1.StorageCluster, additionalCephFilesystemStorageClass []ocsv1.AdditionalCephFilesystemStorageClass) []StorageClassConfiguration {
var additionalCephFilesystemStorageClassConfigs []StorageClassConfiguration
for i, additionalCephFilesystemStorageClass := range additionalCephFilesystemStorageClass {
additionalCephFilesystemStorageClassConfig := newCephFilesystemStorageClassConfiguration(initData)
meta := &additionalCephFilesystemStorageClassConfig.storageClass.ObjectMeta
meta.Name = additionalCephFilesystemStorageClass.StorageClassName
// i+1 is used for the pool name is 0th index is already used by the default pool
additionalCephFilesystemStorageClassConfig.storageClass.Parameters["pool"] = generateNameForAdditionalCephFilesystemStorageClassSubvolumeGroup(generateNameForCephFilesystem(initData), i+1)
additionalCephFilesystemStorageClassConfigs = append(additionalCephFilesystemStorageClassConfigs, additionalCephFilesystemStorageClassConfig)
}
return additionalCephFilesystemStorageClassConfigs
}

// newCephBlockPoolStorageClassConfiguration generates configuration options for a Ceph Block Pool StorageClass.
Expand Down Expand Up @@ -464,6 +499,10 @@ func (r *StorageClusterReconciler) newStorageClassConfigurations(initData *ocsv1
if initData.Spec.NFS != nil && initData.Spec.NFS.Enable {
ret = append(ret, newCephNFSStorageClassConfiguration(initData))
}
additionalCephFilesystemStorageClasses := initData.Spec.ManagedResources.CephFilesystems.AdditionalCephFilesystemStorageClasses
if len(additionalCephFilesystemStorageClasses) > 0 {
ret = append(ret, newAdditionalCephFilesystemStorageClassConfiguration(initData, additionalCephFilesystemStorageClasses)...)
}
// OBC storageclass will be returned only in TWO conditions,
// a. either 'externalStorage' is enabled
// OR
Expand Down

0 comments on commit 9ab1e2f

Please sign in to comment.