Skip to content

Commit

Permalink
Ensure resource profile changes are applied correctly without any issues
Browse files Browse the repository at this point in the history
• Determine resource profile change by comparing sc.Spec.ResourceProfile
  with sc.Status.LastAppliedResourceProfile. If they are different, then
  a resource profile change is in progress.
• Mark the ContinueUpgradeAfterChecksEvenIfNotHealthy flag on the
  cephCluster CR to false if a resource profile change is in progress.
• During the below process mark the storageCluster phase as Progressing.
  The storageCluster conditions will show "New resource profile is being
  applied".
• Keep checking if any of the pods with the new resource profile
  label are in pending state for more than 2 minutes. If yes, then
  assume that the resource profile change has failed. Mark the
  storageCluster phase as Error and the storageCluster conditions will
  show "New resource profile failed to apply, please revert to the last
  working resource profile".
• Start Checking if the ceph dameons(mgr,mon,osd,mds,rgw) have the
  correct number of running pods where each pod has the current
  resource profile label.
• If the above checks are successful, the new resource profile has
  been applied successfully. Set the resource profile in the status &
  Mark the storageCluster phase to Ready.

Signed-off-by: Malay Kumar Parida <[email protected]>
  • Loading branch information
malayparida2000 committed Dec 4, 2023
1 parent 9504bfb commit 161ff28
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 1 deletion.
3 changes: 3 additions & 0 deletions api/v1/storagecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,9 @@ type StorageClusterStatus struct {
// +optional
FailureDomainValues []string `json:"failureDomainValues,omitempty"`

// LastAppliedResourceProfile is the resource profile that was last applied successfully & is currently in use.
LastAppliedResourceProfile string `json:"lastAppliedResourceProfile,omitempty"`

// StorageProviderEndpoint holds endpoint info on Provider cluster which is required
// for consumer to establish connection with the storage providing cluster.
StorageProviderEndpoint string `json:"storageProviderEndpoint,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions controllers/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const (
// RackTopologyKey is the node label used to distribute storage nodes
// when there are not enough AZs presnet across the nodes
RackTopologyKey = "topology.rook.io/rack"
// ODFResourceProfileKey is the label key used to identify the resource profile of the pod
ODFResourceProfileKey = "odf-resource-profile"
// KubeMajorTopologySpreadConstraints is the minimum major kube version to support TSC
// used along with KubeMinorTSC for version comparison
KubeMajorTopologySpreadConstraints = "1"
Expand Down
15 changes: 14 additions & 1 deletion controllers/storagecluster/cephcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,13 @@ func newCephCluster(sc *ocsv1.StorageCluster, cephImage string, nodeCount int, s
rookCephv1.KeyOSD: systemNodeCritical,
},
Resources: newCephDaemonResources(sc),
ContinueUpgradeAfterChecksEvenIfNotHealthy: true,
// if resource profile change is in progress, then set this flag to false
ContinueUpgradeAfterChecksEvenIfNotHealthy: sc.Spec.ResourceProfile == sc.Status.LastAppliedResourceProfile,
LogCollector: logCollector,
Labels: rookCephv1.LabelsSpec{
rookCephv1.KeyMgr: rookCephv1.Labels{defaults.ODFResourceProfileKey: sc.Spec.ResourceProfile},
rookCephv1.KeyMon: rookCephv1.Labels{defaults.ODFResourceProfileKey: sc.Spec.ResourceProfile},
rookCephv1.KeyOSD: rookCephv1.Labels{defaults.ODFResourceProfileKey: sc.Spec.ResourceProfile},
rookCephv1.KeyMonitoring: getCephClusterMonitoringLabels(*sc),
},
CSI: rookCephv1.CSIDriverSpec{
Expand Down Expand Up @@ -1303,3 +1307,12 @@ func isBluestore(store rookCephv1.OSDStore) bool {

return false
}

func (r *StorageClusterReconciler) getOsdCount(sc *ocsv1.StorageCluster) int {
storageClassDeviceSets := newStorageClassDeviceSets(sc, r.serverVersion)
osdCount := 0
for _, ds := range storageClassDeviceSets {
osdCount += ds.Count
}
return osdCount
}
1 change: 1 addition & 0 deletions controllers/storagecluster/cephfilesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (r *StorageClusterReconciler) newCephFilesystemInstances(initStorageCluster
Resources: defaults.GetProfileDaemonResources("mds", initStorageCluster),
// set PriorityClassName for the MDS pods
PriorityClassName: openshiftUserCritical,
Labels: cephv1.Labels{defaults.ODFResourceProfileKey: initStorageCluster.Spec.ResourceProfile},
},
},
}
Expand Down
1 change: 1 addition & 0 deletions controllers/storagecluster/cephobjectstores.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (r *StorageClusterReconciler) newCephObjectStoreInstances(initData *ocsv1.S
Resources: defaults.GetProfileDaemonResources("rgw", initData),
// set PriorityClassName for the rgw pods
PriorityClassName: openshiftUserCritical,
Labels: cephv1.Labels{defaults.ODFResourceProfileKey: initData.Spec.ResourceProfile},
},
},
},
Expand Down
20 changes: 20 additions & 0 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,26 @@ func (r *StorageClusterReconciler) reconcilePhases(
return returnRes, nil
}
}
// Process resource profiles only if the cluster is not external or provider mode or noobaa standalone, and if the resource profile has changed
if !(instance.Spec.ExternalStorage.Enable || instance.Spec.AllowRemoteStorageConsumers || r.IsNoobaaStandalone) &&
(instance.Spec.ResourceProfile != instance.Status.LastAppliedResourceProfile) {
err := r.ensureResourceProfileChangeApplied(instance)
if err != nil {
if err == errResourceProfileChangeApplying {
reason := ocsv1.ReconcileFailed
message := err.Error()
statusutil.SetProgressingCondition(&instance.Status.Conditions, reason, message)
instance.Status.Phase = statusutil.PhaseProgressing
return reconcile.Result{Requeue: true}, nil
} else if err == errResourceProfileChangeFailed {
reason := ocsv1.ReconcileFailed
message := err.Error()
statusutil.SetErrorCondition(&instance.Status.Conditions, reason, message)
instance.Status.Phase = statusutil.PhaseError
}
return reconcile.Result{}, err
}
}
// All component operators are in a happy state.
if r.conditions == nil {
r.Log.Info("No component operator reported negatively.")
Expand Down
96 changes: 96 additions & 0 deletions controllers/storagecluster/resourceprofile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package storagecluster

import (
"errors"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/v4/api/v1"
"github.com/red-hat-storage/ocs-operator/v4/controllers/defaults"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
corev1 "k8s.io/api/core/v1"
)

const (
resourceProfileChangeApplyingMessage = "New resource profile is being applied"
resourceProfileChangeFailedMessage = "New resource profile failed to apply, please revert to the last working resource profile"
)

var (
errResourceProfileChangeApplying = errors.New(resourceProfileChangeApplyingMessage)
errResourceProfileChangeFailed = errors.New(resourceProfileChangeFailedMessage)
)

func (r *StorageClusterReconciler) ensureResourceProfileChangeApplied(sc *ocsv1.StorageCluster) error {
currentResourceProfile := sc.Spec.ResourceProfile
lastAppliedResourceProfile := sc.Status.LastAppliedResourceProfile
r.Log.Info("Applying new resource profile", "current", currentResourceProfile, "last", lastAppliedResourceProfile)

// If any of the pods with the current resource profile label are in pending state for more than 2 minutes, then we assume that the resource profile change has failed as the pods are stuck in pending state
podList, err := util.GetPodsWithLabels(r.ctx, r.Client, sc.Namespace, map[string]string{defaults.ODFResourceProfileKey: currentResourceProfile})
if err != nil {
return err
}
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodPending {
if time.Since(pod.CreationTimestamp.Time) < 2*time.Minute {
return errResourceProfileChangeApplying
}
r.Log.Error(errResourceProfileChangeFailed, "Pod is stuck in pending state", "pod", pod.Name)
return errResourceProfileChangeFailed
}
}

// Verify if expected number of mgr pods with the current resource profile label are running
if err := r.verifyDaemonWithResourceProfile(sc.Namespace, map[string]string{"app": "rook-ceph-mgr", defaults.ODFResourceProfileKey: currentResourceProfile}, getMgrCount(sc)); err != nil {
return err
}

// Verify if expected number of mon pods with the current resource profile label are running
if err := r.verifyDaemonWithResourceProfile(sc.Namespace, map[string]string{"app": "rook-ceph-mon", defaults.ODFResourceProfileKey: currentResourceProfile}, getMonCount(sc)); err != nil {
return err
}

// Verify if expected number of osd pods with the current resource profile label are running
if err := r.verifyDaemonWithResourceProfile(sc.Namespace, map[string]string{"app": "rook-ceph-osd", defaults.ODFResourceProfileKey: currentResourceProfile}, r.getOsdCount(sc)); err != nil {
return err
}

// Verify if expected number of mds pods with the current resource profile label are running
if err := r.verifyDaemonWithResourceProfile(sc.Namespace, map[string]string{"app": "rook-ceph-mds", defaults.ODFResourceProfileKey: currentResourceProfile}, 2*getActiveMetadataServers(sc)); err != nil {
return err
}

// If rgw is not skipped, Verify if expected number of rgw pods with the current resource profile label are running
skiprgw, err := r.PlatformsShouldSkipObjectStore()
if err != nil {
return err
}
if !skiprgw {
if err := r.verifyDaemonWithResourceProfile(sc.Namespace, map[string]string{"app": "rook-ceph-rgw", defaults.ODFResourceProfileKey: currentResourceProfile}, getCephObjectStoreGatewayInstances(sc)); err != nil {
return err
}
}

// If we are here, then all the ceph daemons have the correct count of pods with the new resource profile
// New resource profile has been applied successfully
sc.Status.LastAppliedResourceProfile = currentResourceProfile
if err = r.Client.Status().Update(r.ctx, sc); err != nil {
r.Log.Error(err, "Error updating status after resource profile change")
return err
}
r.Log.Info("Resource profile change applied successfully", "current", currentResourceProfile, "last", lastAppliedResourceProfile)
return nil
}

// verifyDaemonsWithResourceProfile verifies if a ceph daemon has the expected number of pods running with the given resource profile label
func (r *StorageClusterReconciler) verifyDaemonWithResourceProfile(namespace string, labelSelector map[string]string, count int) error {
podList, err := util.GetPodsWithLabels(r.ctx, r.Client, namespace, labelSelector)
if err != nil {
return err
}
if util.GetCountOfRunningPods(podList) != count {
r.Log.Info("pod count mismatch", "app", labelSelector["app"], "expected", count, "actual", len(podList.Items))
return errResourceProfileChangeApplying
}
return nil
}
23 changes: 23 additions & 0 deletions controllers/util/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
configv1 "github.com/openshift/api/config/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -95,3 +96,25 @@ func RestartPod(ctx context.Context, kubeClient client.Client, logger *logr.Logg
}
}
}

// GetPodsWithLabels gives all the pods that are in a namespace after filtering them based on the given label selector
func GetPodsWithLabels(ctx context.Context, kubeClient client.Client, namespace string, labelSelector map[string]string) (*corev1.PodList, error) {
podList := &corev1.PodList{}
if err := kubeClient.List(ctx, podList, client.InNamespace(namespace), &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labelSelector),
}); err != nil {
return nil, err
}
return podList, nil
}

// getCountOfRunningPods gives the count of pods in running state in a given pod list
func GetCountOfRunningPods(podList *corev1.PodList) int {
count := 0
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodRunning {
count++
}
}
return count
}

0 comments on commit 161ff28

Please sign in to comment.