From 249574c09e10b826e334b1716b9dbe792321449b Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 19 Aug 2024 11:47:04 +0545 Subject: [PATCH 1/3] feat: jitter on cron jobs --- job/job.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/job/job.go b/job/job.go index 94f021d0..034406c2 100644 --- a/job/job.go +++ b/job/job.go @@ -3,7 +3,9 @@ package job import ( "container/ring" gocontext "context" + "crypto/rand" "fmt" + "math/big" "strconv" "sync" "time" @@ -26,6 +28,14 @@ const ( ResourceTypeUpstream = "upstream" ) +const ( + // initialJitterPercent sets by how much the initial scheduling of the job should be delayed + initialJitterPercent = 100 + + // iterationJitterPercent sets the maximum percent by which to jitter each subsequent invocation of a periodic job + iterationJitterPercent = 10 +) + var evictedJobs chan uuid.UUID // deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m), @@ -511,20 +521,30 @@ func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { j.Context.Infof("skipping scheduling") return nil } - j.Context.Logger.Named(j.GetResourcedName()).V(1).Infof("scheduled %s", schedule) - entryID, err := cronRunner.AddJob(schedule, j) + + delayedJob := cron.FuncJob(func() { + randomDelay, _ := rand.Int(rand.Reader, big.NewInt(int64(iterationJitterPercent))) + jitterDuration := time.Duration(randomDelay.Int64()) + j.Context.Logger.Named(j.GetResourcedName()).V(2).Infof("jitter: %s", jitterDuration) + time.Sleep(jitterDuration) + j.Run() + }) + entryID, err := cronRunner.AddJob(schedule, delayedJob) if err != nil { return fmt.Errorf("[%s] failed to schedule job: %s", j.Label(), err) } j.entryID = &entryID + if j.RunNow { // Run in a goroutine since AddToScheduler should be non-blocking defer func() { go j.Run() }() } + j.unschedule = func() { cronRunner.Remove(*j.entryID) } + return nil } From 0ff8c4d0a9b362eee3c89ac26264cbbb40b8a9ab Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 19 Aug 2024 18:23:01 +0545 Subject: [PATCH 2/3] chore: jitter by percent --- job/job.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/job/job.go b/job/job.go index 034406c2..5866ee72 100644 --- a/job/job.go +++ b/job/job.go @@ -29,9 +29,6 @@ const ( ) const ( - // initialJitterPercent sets by how much the initial scheduling of the job should be delayed - initialJitterPercent = 100 - // iterationJitterPercent sets the maximum percent by which to jitter each subsequent invocation of a periodic job iterationJitterPercent = 10 ) @@ -498,7 +495,6 @@ func (j *Job) GetResourcedName() string { } func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { - echo.RegisterCron(cronRunner) cronRunner.Start() @@ -523,10 +519,19 @@ func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { } j.Context.Logger.Named(j.GetResourcedName()).V(1).Infof("scheduled %s", schedule) + // Attempt to get a fixed interval from the schedule + // to measure the appropriate jitter. + // NOTE: Only works for fixed interval schedules. + parsedSchedule, err := cron.ParseStandard(j.Schedule) + if err != nil { + return fmt.Errorf("failed to parse schedule: %s", err) + } + interval := time.Until(parsedSchedule.Next(time.Now())) + delayedJob := cron.FuncJob(func() { - randomDelay, _ := rand.Int(rand.Reader, big.NewInt(int64(iterationJitterPercent))) - jitterDuration := time.Duration(randomDelay.Int64()) - j.Context.Logger.Named(j.GetResourcedName()).V(2).Infof("jitter: %s", jitterDuration) + delayPercent, _ := rand.Int(rand.Reader, big.NewInt(int64(iterationJitterPercent))) + jitterDuration := time.Duration((int64(interval) * delayPercent.Int64()) / 100) + j.Context.Logger.Named(j.GetResourcedName()).V(2).Infof("jitter (%d%%): %s", delayPercent, jitterDuration) time.Sleep(jitterDuration) j.Run() }) From d30eabcd15ae6faa4cdee244177207e2e202dde2 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 19 Aug 2024 19:05:33 +0545 Subject: [PATCH 3/3] feat: add max jitter duration --- job/job.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/job/job.go b/job/job.go index 5866ee72..d56df1a6 100644 --- a/job/job.go +++ b/job/job.go @@ -3,9 +3,8 @@ package job import ( "container/ring" gocontext "context" - "crypto/rand" "fmt" - "math/big" + "math/rand" "strconv" "sync" "time" @@ -31,6 +30,8 @@ const ( const ( // iterationJitterPercent sets the maximum percent by which to jitter each subsequent invocation of a periodic job iterationJitterPercent = 10 + + maxJitterDuration = time.Minute * 15 ) var evictedJobs chan uuid.UUID @@ -527,11 +528,15 @@ func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { return fmt.Errorf("failed to parse schedule: %s", err) } interval := time.Until(parsedSchedule.Next(time.Now())) + if interval > maxJitterDuration { + interval = maxJitterDuration + } delayedJob := cron.FuncJob(func() { - delayPercent, _ := rand.Int(rand.Reader, big.NewInt(int64(iterationJitterPercent))) - jitterDuration := time.Duration((int64(interval) * delayPercent.Int64()) / 100) - j.Context.Logger.Named(j.GetResourcedName()).V(2).Infof("jitter (%d%%): %s", delayPercent, jitterDuration) + delayPercent := rand.Intn(iterationJitterPercent) + jitterDuration := time.Duration((int64(interval) * int64(delayPercent)) / 100) + j.Context.Logger.V(4).Infof("jitter (%d%%): %s", delayPercent, jitterDuration) + time.Sleep(jitterDuration) j.Run() })