From f5bbdddb318ed91a67b7feb513fd871f32981250 Mon Sep 17 00:00:00 2001 From: Rory Z <16801068+Rory-Z@users.noreply.github.com> Date: Sun, 7 Jan 2024 17:35:43 +0800 Subject: [PATCH] feat: if updated sts/rs equal current sts/rs, don't create new resource Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com> --- controllers/apps/v2beta1/add_emqx_core.go | 57 +++++++++++----------- controllers/apps/v2beta1/add_emqx_repl.go | 58 +++++++++++------------ 2 files changed, 56 insertions(+), 59 deletions(-) diff --git a/controllers/apps/v2beta1/add_emqx_core.go b/controllers/apps/v2beta1/add_emqx_core.go index e5b829d22..640397dad 100644 --- a/controllers/apps/v2beta1/add_emqx_core.go +++ b/controllers/apps/v2beta1/add_emqx_core.go @@ -29,6 +29,7 @@ type addCore struct { func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult { logger := log.FromContext(ctx) preSts := getNewStatefulSet(instance) + preStsHash := preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey] updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance) patchCalculateFunc := func(storage, new *appsv1.StatefulSet) *patch.PatchResult { @@ -49,6 +50,14 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i _ = ctrl.SetControllerReference(instance, preSts, a.Scheme) if err := a.Handler.Create(preSts); err != nil { if k8sErrors.IsAlreadyExists(emperror.Cause(err)) { + // Sometimes the updated statefulSet will not be ready, because the EMQX node can not be started. + // And then we will rollback EMQX CR spec, the EMQX operator controller will create a new statefulSet. + // But the new statefulSet will be the same as the previous one, so we didn't need to create it, just change the EMQX status. + if preStsHash == instance.Status.CoreNodesStatus.CurrentRevision { + _ = a.updateEMQXStatus(ctx, instance, "RevertStatefulSet", "Revert to current statefulSet", preStsHash) + return subResult{} + } + if instance.Status.CoreNodesStatus.CollisionCount == nil { instance.Status.CoreNodesStatus.CollisionCount = pointer.Int32(0) } @@ -58,21 +67,7 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i } return subResult{err: emperror.Wrap(err, "failed to create statefulSet")} } - // Update EMQX status - _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.CoreNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewStatefulSet", - Message: "Create new statefulSet", - }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) - instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey] - return a.Client.Status().Update(ctx, instance) - }) + _ = a.updateEMQXStatus(ctx, instance, "CreateNewStatefulSet", "Create new statefulSet", preStsHash) return subResult{} } @@ -95,24 +90,28 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i }); err != nil { return subResult{err: emperror.Wrap(err, "failed to update statefulSet")} } - // Update EMQX status - _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.CoreNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewStatefulSet", - Message: "Create new statefulSet", - }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) - return a.Client.Status().Update(ctx, instance) - }) + _ = a.updateEMQXStatus(ctx, instance, "UpdateStatefulSet", "Update exist statefulSet", preStsHash) } return subResult{} } +func (a *addCore) updateEMQXStatus(ctx context.Context, instance *appsv2beta1.EMQX, reason, message, podTemplateHash string) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) + instance.Status.SetCondition(metav1.Condition{ + Type: appsv2beta1.CoreNodesProgressing, + Status: metav1.ConditionTrue, + Reason: reason, + Message: message, + }) + instance.Status.RemoveCondition(appsv2beta1.Ready) + instance.Status.RemoveCondition(appsv2beta1.Available) + instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) + instance.Status.CoreNodesStatus.UpdateRevision = podTemplateHash + return a.Client.Status().Update(ctx, instance) + }) +} + func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet { svcPorts, _ := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data) diff --git a/controllers/apps/v2beta1/add_emqx_repl.go b/controllers/apps/v2beta1/add_emqx_repl.go index 327af13fc..b2640e071 100644 --- a/controllers/apps/v2beta1/add_emqx_repl.go +++ b/controllers/apps/v2beta1/add_emqx_repl.go @@ -35,6 +35,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i logger := log.FromContext(ctx) preRs := getNewReplicaSet(instance) + preRsHash := preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey] updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance) patchCalculateFunc := func(storage, new *appsv1.ReplicaSet) *patch.PatchResult { @@ -56,6 +57,14 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i _ = ctrl.SetControllerReference(instance, preRs, a.Scheme) if err := a.Handler.Create(preRs); err != nil { if k8sErrors.IsAlreadyExists(emperror.Cause(err)) { + // Sometimes the updated replicaSet will not be ready, because the EMQX node can not be started. + // And then we will rollback EMQX CR spec, the EMQX operator controller will create a new replicaSet. + // But the new replicaSet will be the same as the previous one, so we didn't need to create it, just change the EMQX status. + if preRsHash == instance.Status.ReplicantNodesStatus.CurrentRevision { + _ = a.updateEMQXStatus(ctx, instance, "RevertReplicaSet", "Revert to current replicaSet", preRsHash) + return subResult{} + } + if instance.Status.ReplicantNodesStatus.CollisionCount == nil { instance.Status.ReplicantNodesStatus.CollisionCount = pointer.Int32(0) } @@ -65,22 +74,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i } return subResult{err: emperror.Wrap(err, "failed to create replicaSet")} } - - // Update EMQX status - _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.ReplicantNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewReplicaSet", - Message: "Create new replicaSet", - }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) - instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey] - return a.Client.Status().Update(ctx, instance) - }) + _ = a.updateEMQXStatus(ctx, instance, "CreateReplicaSet", "Create new replicaSet", preRsHash) return subResult{} } @@ -103,24 +97,28 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i }); err != nil { return subResult{err: emperror.Wrap(err, "failed to update replicaSet")} } - // Update EMQX status - _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.ReplicantNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewReplicaSet", - Message: "Create new replicaSet", - }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) - return a.Client.Status().Update(ctx, instance) - }) + _ = a.updateEMQXStatus(ctx, instance, "UpdateReplicaSet", "Update exist replicaSet", preRsHash) } return subResult{} } +func (a *addRepl) updateEMQXStatus(ctx context.Context, instance *appsv2beta1.EMQX, reason, message, podTemplateHash string) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) + instance.Status.SetCondition(metav1.Condition{ + Type: appsv2beta1.ReplicantNodesProgressing, + Status: metav1.ConditionTrue, + Reason: reason, + Message: message, + }) + instance.Status.RemoveCondition(appsv2beta1.Ready) + instance.Status.RemoveCondition(appsv2beta1.Available) + instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) + instance.Status.ReplicantNodesStatus.UpdateRevision = podTemplateHash + return a.Client.Status().Update(ctx, instance) + }) +} + func getNewReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet { svcPorts, _ := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)