Skip to content

Commit

Permalink
feat: if updated sts/rs equal current sts/rs, don't create new resource
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Jan 7, 2024
1 parent e72e717 commit f5bbddd
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 59 deletions.
57 changes: 28 additions & 29 deletions controllers/apps/v2beta1/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type addCore struct {
func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult {
logger := log.FromContext(ctx)
preSts := getNewStatefulSet(instance)
preStsHash := preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance)

patchCalculateFunc := func(storage, new *appsv1.StatefulSet) *patch.PatchResult {
Expand All @@ -49,6 +50,14 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
_ = ctrl.SetControllerReference(instance, preSts, a.Scheme)
if err := a.Handler.Create(preSts); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
// Sometimes the updated statefulSet will not be ready, because the EMQX node can not be started.
// And then we will rollback EMQX CR spec, the EMQX operator controller will create a new statefulSet.
// But the new statefulSet will be the same as the previous one, so we didn't need to create it, just change the EMQX status.
if preStsHash == instance.Status.CoreNodesStatus.CurrentRevision {
_ = a.updateEMQXStatus(ctx, instance, "RevertStatefulSet", "Revert to current statefulSet", preStsHash)
return subResult{}
}

if instance.Status.CoreNodesStatus.CollisionCount == nil {
instance.Status.CoreNodesStatus.CollisionCount = pointer.Int32(0)
}
Expand All @@ -58,21 +67,7 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}
return subResult{err: emperror.Wrap(err, "failed to create statefulSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "CreateNewStatefulSet", "Create new statefulSet", preStsHash)
return subResult{}
}

Expand All @@ -95,24 +90,28 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update statefulSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "UpdateStatefulSet", "Update exist statefulSet", preStsHash)
}
return subResult{}
}

func (a *addCore) updateEMQXStatus(ctx context.Context, instance *appsv2beta1.EMQX, reason, message, podTemplateHash string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
instance.Status.CoreNodesStatus.UpdateRevision = podTemplateHash
return a.Client.Status().Update(ctx, instance)
})
}

func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
svcPorts, _ := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)

Expand Down
58 changes: 28 additions & 30 deletions controllers/apps/v2beta1/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i

logger := log.FromContext(ctx)
preRs := getNewReplicaSet(instance)
preRsHash := preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance)

patchCalculateFunc := func(storage, new *appsv1.ReplicaSet) *patch.PatchResult {
Expand All @@ -56,6 +57,14 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
_ = ctrl.SetControllerReference(instance, preRs, a.Scheme)
if err := a.Handler.Create(preRs); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
// Sometimes the updated replicaSet will not be ready, because the EMQX node can not be started.
// And then we will rollback EMQX CR spec, the EMQX operator controller will create a new replicaSet.
// But the new replicaSet will be the same as the previous one, so we didn't need to create it, just change the EMQX status.
if preRsHash == instance.Status.ReplicantNodesStatus.CurrentRevision {
_ = a.updateEMQXStatus(ctx, instance, "RevertReplicaSet", "Revert to current replicaSet", preRsHash)
return subResult{}
}

if instance.Status.ReplicantNodesStatus.CollisionCount == nil {
instance.Status.ReplicantNodesStatus.CollisionCount = pointer.Int32(0)
}
Expand All @@ -65,22 +74,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}
return subResult{err: emperror.Wrap(err, "failed to create replicaSet")}
}

// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "CreateReplicaSet", "Create new replicaSet", preRsHash)
return subResult{}
}

Expand All @@ -103,24 +97,28 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update replicaSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
return a.Client.Status().Update(ctx, instance)
})
_ = a.updateEMQXStatus(ctx, instance, "UpdateReplicaSet", "Update exist replicaSet", preRsHash)
}
return subResult{}
}

func (a *addRepl) updateEMQXStatus(ctx context.Context, instance *appsv2beta1.EMQX, reason, message, podTemplateHash string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
instance.Status.ReplicantNodesStatus.UpdateRevision = podTemplateHash
return a.Client.Status().Update(ctx, instance)
})
}

func getNewReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {
svcPorts, _ := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)

Expand Down

0 comments on commit f5bbddd

Please sign in to comment.