Skip to content

Commit

Permalink
Add support for Spec.PostCompleteDelay to set delay after job completion
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Nov 1, 2024
1 parent 3cbc05c commit 91815cb
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 18 deletions.
13 changes: 7 additions & 6 deletions pkg/apis/upgrade.cattle.io/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ type PlanSpec struct {

Exclusive bool `json:"exclusive,omitempty"`

Window *TimeWindowSpec `json:"window,omitempty"`
Prepare *ContainerSpec `json:"prepare,omitempty"`
Cordon bool `json:"cordon,omitempty"`
Drain *DrainSpec `json:"drain,omitempty"`
Upgrade *ContainerSpec `json:"upgrade,omitempty" wrangler:"required"`
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
Window *TimeWindowSpec `json:"window,omitempty"`
Prepare *ContainerSpec `json:"prepare,omitempty"`
Cordon bool `json:"cordon,omitempty"`
Drain *DrainSpec `json:"drain,omitempty"`
Upgrade *ContainerSpec `json:"upgrade,omitempty" wrangler:"required"`
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
PostCompleteDelay *metav1.Duration `json:"postCompleteDelay,omitempty"`
}

// PlanStatus represents the resulting state from processing Plan events.
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/upgrade.cattle.io/v1/zz_generated_deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 31 additions & 10 deletions pkg/upgrade/handle_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"strconv"
"time"

"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
upgradejob "github.com/rancher/system-upgrade-controller/pkg/upgrade/job"
batchctlv1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch/v1"
"github.com/sirupsen/logrus"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -81,21 +81,47 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
}
// if the job has failed enqueue-or-delete it depending on the TTL window
if upgradejob.ConditionFailed.IsTrue(obj) {
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionFailed)
failedTime := upgradejob.ConditionFailed.GetLastTransitionTime(obj)
if failedTime.IsZero() {
return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionFailed, "LastTransitionTime")
}
ctl.recorder.Eventf(plan, corev1.EventTypeWarning, "JobFailed", "Job failed on Node %s", node.Name)
return obj, enqueueOrDelete(jobs, obj, failedTime)
}
// if the job has completed tag the node then enqueue-or-delete depending on the TTL window
if upgradejob.ConditionComplete.IsTrue(obj) {
completeTime := upgradejob.ConditionComplete.GetLastTransitionTime(obj)
if completeTime.IsZero() {
return obj, fmt.Errorf("condition %q missing field %q", upgradejob.ConditionComplete, "LastTransitionTime")
}
planLabel := upgradeapi.LabelPlanName(planName)
if planHash, ok := obj.Labels[planLabel]; ok {
node.Labels[planLabel] = planHash
var delay time.Duration
if plan.Spec.PostCompleteDelay != nil {
delay = plan.Spec.PostCompleteDelay.Duration
}
// if the job has not been completed for the configured delay, re-enqueue
// it for processing once the delay has elapsed.
// the job's TTLSecondsAfterFinished is guaranteed to be set to a larger value
// than the plan's requested delay.
if interval := time.Now().Sub(completeTime); interval < delay {
logrus.Debugf("Enqueing sync of Job %s/%s in %v", obj.Namespace, obj.Name, delay-interval)
ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobCompleteWaiting", "Job completed on Node %s, waiting %s PostCompleteDelay", node.Name, delay)
jobs.EnqueueAfter(obj.Namespace, obj.Name, delay-interval)
} else {
ctl.recorder.Eventf(plan, corev1.EventTypeNormal, "JobComplete", "Job completed on Node %s", node.Name)
node.Labels[planLabel] = planHash
}
// mark the node as schedulable even if the delay has not elapsed, so that
// workloads can resume scheduling.
if node.Spec.Unschedulable && (plan.Spec.Cordon || plan.Spec.Drain != nil) {
node.Spec.Unschedulable = false
}
if node, err = nodes.Update(node); err != nil {
return obj, err
}
}
return obj, enqueueOrDelete(jobs, obj, upgradejob.ConditionComplete)
return obj, enqueueOrDelete(jobs, obj, completeTime)
}
// if the job is hasn't failed or completed but the job Node is not on the applying list, consider it running out-of-turn and delete it
if i := sort.SearchStrings(plan.Status.Applying, nodeName); i == len(plan.Status.Applying) ||
Expand All @@ -108,12 +134,7 @@ func (ctl *Controller) handleJobs(ctx context.Context) error {
return nil
}

func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, done condition.Cond) error {
lastTransitionTime := done.GetLastTransitionTime(job)
if lastTransitionTime.IsZero() {
return fmt.Errorf("condition %q missing field %q", done, "LastTransitionTime")
}

func enqueueOrDelete(jobController batchctlv1.JobController, job *batchv1.Job, lastTransitionTime time.Time) error {
var ttlSecondsAfterFinished time.Duration

if job.Spec.TTLSecondsAfterFinished == nil {
Expand Down
17 changes: 15 additions & 2 deletions pkg/upgrade/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"slices"
"strconv"
"strings"
"time"

"github.com/rancher/system-upgrade-controller/pkg/apis/condition"
upgradeapi "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io"
Expand Down Expand Up @@ -133,9 +134,21 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
labelPlanName := upgradeapi.LabelPlanName(plan.Name)
nodeHostname := upgradenode.Hostname(node)
shortNodeName := strings.SplitN(node.Name, ".", 2)[0]
ttlSecondsAfterFinished := TTLSecondsAfterFinished

// Ensure that the job's TTLSecondsAfterFinished is at least 1 minute longer than
// the requested post-upgrade delay, so that the controller has time to see that
// it has been completed for the requested duration.
if delay := plan.Spec.PostCompleteDelay; delay != nil {
ttlPostCompleteDelay := delay.Duration + time.Minute
ttlAfterFinished := time.Duration(ttlSecondsAfterFinished) * time.Second
if ttlAfterFinished < ttlPostCompleteDelay {
ttlSecondsAfterFinished = int32(ttlPostCompleteDelay.Seconds())
}
}

jobAnnotations := labels.Set{
upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10),
upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(ttlSecondsAfterFinished), 10),
}
podAnnotations := labels.Set{}

Expand Down Expand Up @@ -171,7 +184,7 @@ func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *bat
Spec: batchv1.JobSpec{
PodReplacementPolicy: &PodReplacementPolicy,
BackoffLimit: &BackoffLimit,
TTLSecondsAfterFinished: &TTLSecondsAfterFinished,
TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: podAnnotations,
Expand Down
4 changes: 4 additions & 0 deletions pkg/upgrade/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
ErrDrainDeleteConflict = fmt.Errorf("spec.drain cannot specify both deleteEmptydirData and deleteLocalData")
ErrDrainPodSelectorNotSelectable = fmt.Errorf("spec.drain.podSelector is not selectable")
ErrInvalidWindow = fmt.Errorf("spec.window is invalid")
ErrInvalidDelay = fmt.Errorf("spec.postCompleteDelay must be greater than 0s if set")

PollingInterval = func(defaultValue time.Duration) time.Duration {
if str, ok := os.LookupEnv("SYSTEM_UPGRADE_PLAN_POLLING_INTERVAL"); ok {
Expand Down Expand Up @@ -257,5 +258,8 @@ func Validate(plan *upgradeapiv1.Plan) error {
return ErrInvalidWindow
}
}
if delay := plan.Spec.PostCompleteDelay; delay != nil && delay.Duration < 0 {
return ErrInvalidDelay
}
return nil
}

0 comments on commit 91815cb

Please sign in to comment.