diff --git a/job/job.go b/job/job.go index 94f021d0..034406c2 100644 --- a/job/job.go +++ b/job/job.go @@ -3,7 +3,9 @@ package job import ( "container/ring" gocontext "context" + "crypto/rand" "fmt" + "math/big" "strconv" "sync" "time" @@ -26,6 +28,14 @@ const ( ResourceTypeUpstream = "upstream" ) +const ( + // initialJitterPercent sets by how much the initial scheduling of the job should be delayed + initialJitterPercent = 100 + + // iterationJitterPercent sets the maximum percent by which to jitter each subsequent invocation of a periodic job + iterationJitterPercent = 10 +) + var evictedJobs chan uuid.UUID // deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m), @@ -511,20 +521,30 @@ func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { j.Context.Infof("skipping scheduling") return nil } - j.Context.Logger.Named(j.GetResourcedName()).V(1).Infof("scheduled %s", schedule) - entryID, err := cronRunner.AddJob(schedule, j) + + delayedJob := cron.FuncJob(func() { + randomDelay, _ := rand.Int(rand.Reader, big.NewInt(int64(iterationJitterPercent))) + jitterDuration := time.Duration(randomDelay.Int64()) + j.Context.Logger.Named(j.GetResourcedName()).V(2).Infof("jitter: %s", jitterDuration) + time.Sleep(jitterDuration) + j.Run() + }) + entryID, err := cronRunner.AddJob(schedule, delayedJob) if err != nil { return fmt.Errorf("[%s] failed to schedule job: %s", j.Label(), err) } j.entryID = &entryID + if j.RunNow { // Run in a goroutine since AddToScheduler should be non-blocking defer func() { go j.Run() }() } + j.unschedule = func() { cronRunner.Remove(*j.entryID) } + return nil }