Skip to content

Commit

Permalink
feat: jitter on cron jobs (#953)
Browse files Browse the repository at this point in the history
* feat: jitter on cron jobs

* chore: jitter by percent

* feat: add max jitter duration
  • Loading branch information
adityathebe authored Aug 19, 2024
1 parent 2b3ecd0 commit 4c5b640
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/ring"
gocontext "context"
"fmt"
"math/rand"
"strconv"
"sync"
"time"
Expand All @@ -26,6 +27,13 @@ const (
ResourceTypeUpstream = "upstream"
)

const (
// iterationJitterPercent sets the maximum percent by which to jitter each subsequent invocation of a periodic job
iterationJitterPercent = 10

maxJitterDuration = time.Minute * 15
)

var evictedJobs chan uuid.UUID

// deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m),
Expand Down Expand Up @@ -488,7 +496,6 @@ func (j *Job) GetResourcedName() string {
}

func (j *Job) AddToScheduler(cronRunner *cron.Cron) error {

echo.RegisterCron(cronRunner)
cronRunner.Start()

Expand All @@ -511,20 +518,43 @@ func (j *Job) AddToScheduler(cronRunner *cron.Cron) error {
j.Context.Infof("skipping scheduling")
return nil
}

j.Context.Logger.Named(j.GetResourcedName()).V(1).Infof("scheduled %s", schedule)
entryID, err := cronRunner.AddJob(schedule, j)

// Attempt to get a fixed interval from the schedule
// to measure the appropriate jitter.
// NOTE: Only works for fixed interval schedules.
parsedSchedule, err := cron.ParseStandard(j.Schedule)
if err != nil {
return fmt.Errorf("failed to parse schedule: %s", err)
}
interval := time.Until(parsedSchedule.Next(time.Now()))
if interval > maxJitterDuration {
interval = maxJitterDuration
}

delayedJob := cron.FuncJob(func() {
delayPercent := rand.Intn(iterationJitterPercent)
jitterDuration := time.Duration((int64(interval) * int64(delayPercent)) / 100)
j.Context.Logger.V(4).Infof("jitter (%d%%): %s", delayPercent, jitterDuration)

time.Sleep(jitterDuration)
j.Run()
})
entryID, err := cronRunner.AddJob(schedule, delayedJob)
if err != nil {
return fmt.Errorf("[%s] failed to schedule job: %s", j.Label(), err)
}
j.entryID = &entryID

if j.RunNow {
// Run in a goroutine since AddToScheduler should be non-blocking
defer func() { go j.Run() }()
}

j.unschedule = func() {
cronRunner.Remove(*j.entryID)
}

return nil
}

Expand Down

0 comments on commit 4c5b640

Please sign in to comment.