diff --git a/pkg/apis/upgrade.cattle.io/v1/types.go b/pkg/apis/upgrade.cattle.io/v1/types.go index c4a71994..887a92f6 100644 --- a/pkg/apis/upgrade.cattle.io/v1/types.go +++ b/pkg/apis/upgrade.cattle.io/v1/types.go @@ -48,12 +48,13 @@ type PlanSpec struct { Exclusive bool `json:"exclusive,omitempty"` - Window *TimeWindowSpec `json:"window,omitempty"` - Prepare *ContainerSpec `json:"prepare,omitempty"` - Cordon bool `json:"cordon,omitempty"` - Drain *DrainSpec `json:"drain,omitempty"` - Upgrade *ContainerSpec `json:"upgrade,omitempty" wrangler:"required"` - ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + Window *TimeWindowSpec `json:"window,omitempty"` + Prepare *ContainerSpec `json:"prepare,omitempty"` + Cordon bool `json:"cordon,omitempty"` + Drain *DrainSpec `json:"drain,omitempty"` + Upgrade *ContainerSpec `json:"upgrade,omitempty" wrangler:"required"` + ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + PostCompleteDelay *metav1.Duration `json:"postCompleteDelay,omitempty"` } // PlanStatus represents the resulting state from processing Plan events. diff --git a/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go b/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go index 7cfe7e71..b379098a 100644 --- a/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go +++ b/pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go @@ -232,6 +232,11 @@ func (in *PlanSpec) DeepCopyInto(out *PlanSpec) { *out = make([]corev1.LocalObjectReference, len(*in)) copy(*out, *in) } + if in.PostCompleteDelay != nil { + in, out := &in.PostCompleteDelay, &out.PostCompleteDelay + *out = new(metav1.Duration) + **out = **in + } return } diff --git a/pkg/upgrade/handle_batch.go b/pkg/upgrade/handle_batch.go index 843e1125..97692e82 100644 --- a/pkg/upgrade/handle_batch.go +++ b/pkg/upgrade/handle_batch.go @@ -7,12 +7,12 @@ import ( "strconv" "time" - "github.com/rancher/system-upgrade-controller/pkg/apis/condition" upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io" upgradejob "github.com/rancher/system-upgrade-controller/pkg/upgrade/job" batchctlv1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch/v1" "github.com/sirupsen/logrus" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -81,13 +81,39 @@ func (ctl *Controller) handleJobs(ctx context.Context) error { } // if the job has failed enqueue-or-delete it depending on the TTL window if upgradejob.ConditionFailed.IsTrue(obj) { - return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionFailed) + failedTime := upgradejob.ConditionFailed.GetLastTransitionTime(obj) + if failedTime.IsZero() { + return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionFailed, "LastTransitionTime") + } + ctl.recorder.Eventf(plan, corev1.EventTypeWarning, "JobFailed", "Job failed on Node %s", node.Name) + return obj, enqueueOrDelete(jobs, obj, failedTime) } // if the job has completed tag the node then enqueue-or-delete depending on the TTL window if upgradejob.ConditionComplete.IsTrue(obj) { + completeTime := upgradejob.ConditionComplete.GetLastTransitionTime(obj) + if completeTime.IsZero() { + return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionComplete, "LastTransitionTime") + } planLabel := upgradeapi.LabelPlanName(planName) if planHash, ok := obj.Labels[planLabel]; ok { - node.Labels[planLabel] = planHash + var delay time.Duration + if plan.Spec.PostCompleteDelay != nil { + delay = plan.Spec.PostCompleteDelay.Duration + } + // if the job has not been completed for the configured delay, re-enqueue + // it for processing once the delay has elapsed. + // the job's TTLSecondsAfterFinished is guaranteed to be set to a larger value + // than the plan's requested delay. + if interval := time.Now().Sub(completeTime); interval < delay { + logrus.Debugf("Enqueing sync of Job %s/%s in %v", obj.Namespace, obj.Name, delay-interval) + ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobCompleteWaiting", "Job completed on Node %s, waiting %s PostCompleteDelay", node.Name, delay) + jobs.EnqueueAfter(obj.Namespace, obj.Name, delay-interval) + } else { + ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobComplete", "Job completed on Node %s", node.Name) + node.Labels[planLabel] = planHash + } + // mark the node as schedulable even if the delay has not elapsed, so that + // workloads can resume scheduling. if node.Spec.Unschedulable && (plan.Spec.Cordon || plan.Spec.Drain != nil) { node.Spec.Unschedulable = false } @@ -95,7 +121,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error { return obj, err } } - return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionComplete) + return obj, enqueueOrDelete(jobs, obj, completeTime) } // if the job is hasn't failed or completed but the job Node is not on the applying list, consider it running out-of-turn and delete it if i := sort.SearchStrings(plan.Status.Applying, nodeName); i == len(plan.Status.Applying) || @@ -108,12 +134,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error { return nil } -func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, done condition.Cond) error { - lastTransitionTime := done.GetLastTransitionTime(job) - if lastTransitionTime.IsZero() { - return fmt.Errorf("condition %q missing field %q", done, "LastTransitionTime") - } - +func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, lastTransitionTime time.Time) error { var ttlSecondsAfterFinished time.Duration if job.Spec.TTLSecondsAfterFinished == nil { diff --git a/pkg/upgrade/job/job.go b/pkg/upgrade/job/job.go index df5aa95e..fa17f260 100644 --- a/pkg/upgrade/job/job.go +++ b/pkg/upgrade/job/job.go @@ -5,6 +5,7 @@ import ( "slices" "strconv" "strings" + "time" "github.com/rancher/system-upgrade-controller/pkg/apis/condition" upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io" @@ -133,9 +134,21 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat labelPlanName := upgradeapi.LabelPlanName(plan.Name) nodeHostname := upgradenode.Hostname(node) shortNodeName := strings.SplitN(node.Name, ".", 2)[0] + ttlSecondsAfterFinished := TTLSecondsAfterFinished + + // Ensure that the job's TTLSecondsAfterFinished is at least 1 minute longer than + // the requested post-upgrade delay, so that the controller has time to see that + // it has been completed for the requested duration. + if delay := plan.Spec.PostCompleteDelay; delay != nil { + ttlPostCompleteDelay := delay.Duration + time.Minute + ttlAfterFinished := time.Duration(ttlSecondsAfterFinished) * time.Second + if ttlAfterFinished < ttlPostCompleteDelay { + ttlSecondsAfterFinished = int32(ttlPostCompleteDelay.Seconds()) + } + } jobAnnotations := labels.Set{ - upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10), + upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(ttlSecondsAfterFinished), 10), } podAnnotations := labels.Set{} @@ -171,7 +184,7 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat Spec: batchv1.JobSpec{ PodReplacementPolicy: &PodReplacementPolicy, BackoffLimit: &BackoffLimit, - TTLSecondsAfterFinished: &TTLSecondsAfterFinished, + TTLSecondsAfterFinished: &ttlSecondsAfterFinished, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: podAnnotations, diff --git a/pkg/upgrade/plan/plan.go b/pkg/upgrade/plan/plan.go index 9dd8249c..35e2706e 100644 --- a/pkg/upgrade/plan/plan.go +++ b/pkg/upgrade/plan/plan.go @@ -35,6 +35,7 @@ var ( ErrDrainDeleteConflict = fmt.Errorf("spec.drain cannot specify both deleteEmptydirData and deleteLocalData") ErrDrainPodSelectorNotSelectable = fmt.Errorf("spec.drain.podSelector is not selectable") ErrInvalidWindow = fmt.Errorf("spec.window is invalid") + ErrInvalidDelay = fmt.Errorf("spec.postCompleteDelay must be greater than 0s if set") PollingInterval = func(defaultValue time.Duration) time.Duration { if str, ok := os.LookupEnv("SYSTEM_UPGRADE_PLAN_POLLING_INTERVAL"); ok { @@ -257,5 +258,8 @@ func Validate(plan *upgradeapiv1.Plan) error { return ErrInvalidWindow } } + if delay := plan.Spec.PostCompleteDelay; delay != nil && delay.Duration < 0 { + return ErrInvalidDelay + } return nil }