From 4c5b640967c14e7f818900b96d57256188c98568 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 19 Aug 2024 20:16:48 +0545 Subject: [PATCH] feat: jitter on cron jobs (#953) * feat: jitter on cron jobs * chore: jitter by percent * feat: add max jitter duration --- job/job.go | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/job/job.go b/job/job.go index 94f021d0..d56df1a6 100644 --- a/job/job.go +++ b/job/job.go @@ -4,6 +4,7 @@ import ( "container/ring" gocontext "context" "fmt" + "math/rand" "strconv" "sync" "time" @@ -26,6 +27,13 @@ const ( ResourceTypeUpstream = "upstream" ) +const ( + // iterationJitterPercent sets the maximum percent by which to jitter each subsequent invocation of a periodic job + iterationJitterPercent = 10 + + maxJitterDuration = time.Minute * 15 +) + var evictedJobs chan uuid.UUID // deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m), @@ -488,7 +496,6 @@ func (j *Job) GetResourcedName() string { } func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { - echo.RegisterCron(cronRunner) cronRunner.Start() @@ -511,20 +518,43 @@ func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { j.Context.Infof("skipping scheduling") return nil } - j.Context.Logger.Named(j.GetResourcedName()).V(1).Infof("scheduled %s", schedule) - entryID, err := cronRunner.AddJob(schedule, j) + + // Attempt to get a fixed interval from the schedule + // to measure the appropriate jitter. + // NOTE: Only works for fixed interval schedules. + parsedSchedule, err := cron.ParseStandard(j.Schedule) + if err != nil { + return fmt.Errorf("failed to parse schedule: %s", err) + } + interval := time.Until(parsedSchedule.Next(time.Now())) + if interval > maxJitterDuration { + interval = maxJitterDuration + } + + delayedJob := cron.FuncJob(func() { + delayPercent := rand.Intn(iterationJitterPercent) + jitterDuration := time.Duration((int64(interval) * int64(delayPercent)) / 100) + j.Context.Logger.V(4).Infof("jitter (%d%%): %s", delayPercent, jitterDuration) + + time.Sleep(jitterDuration) + j.Run() + }) + entryID, err := cronRunner.AddJob(schedule, delayedJob) if err != nil { return fmt.Errorf("[%s] failed to schedule job: %s", j.Label(), err) } j.entryID = &entryID + if j.RunNow { // Run in a goroutine since AddToScheduler should be non-blocking defer func() { go j.Run() }() } + j.unschedule = func() { cronRunner.Remove(*j.entryID) } + return nil }