Skip to content

Commit

Permalink
fix: use job struct instead of cron.FuncJob
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Aug 22, 2024
1 parent e488b8e commit 19f46d0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
45 changes: 22 additions & 23 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,26 @@ func (j *Job) Run() {
defer j.lock.Unlock()
}

if !j.Context.Properties().On(false, "job.jitter.disable") {
// Attempt to get a fixed interval from the schedule to measure the appropriate jitter.
// NOTE: Only works for fixed interval schedules.
parsedSchedule, err := cron.ParseStandard(j.Schedule)
if err != nil {
r.Failf("failed to parse schedule (%s): %s", j.Schedule, err)
return
}
interval := time.Until(parsedSchedule.Next(time.Now()))
if interval > maxJitterDuration {
interval = maxJitterDuration
}

delayPercent := rand.Intn(iterationJitterPercent)
jitterDuration := time.Duration((int64(interval) * int64(delayPercent)) / 100)
j.Context.Logger.V(4).Infof("jitter (%d%%): %s", delayPercent, jitterDuration)

time.Sleep(jitterDuration)
}

for i, l := range j.Semaphores {
ctx.Logger.V(9).Infof("[%s] acquiring sempahore [%d/%d]", j.ID, i+1, len(j.Semaphores))
if err := l.Acquire(ctx, 1); err != nil {
Expand All @@ -364,8 +384,7 @@ func (j *Job) Run() {
defer cancel()
}

err := j.Fn(r)
if err != nil {
if err := j.Fn(r); err != nil {
ctx.Tracef("finished duration=%s, error=%s", time.Since(r.History.TimeStart), err)
r.History.AddErrorWithSkipReportLevel(err.Error(), 1)
} else {
Expand Down Expand Up @@ -542,27 +561,7 @@ func (j *Job) AddToScheduler(cronRunner *cron.Cron) error {
}
j.Context.Logger.Named(j.GetResourcedName()).V(1).Infof("scheduled %s", schedule)

// Attempt to get a fixed interval from the schedule
// to measure the appropriate jitter.
// NOTE: Only works for fixed interval schedules.
parsedSchedule, err := cron.ParseStandard(j.Schedule)
if err != nil {
return fmt.Errorf("failed to parse schedule: %s", err)
}
interval := time.Until(parsedSchedule.Next(time.Now()))
if interval > maxJitterDuration {
interval = maxJitterDuration
}

delayedJob := cron.FuncJob(func() {
delayPercent := rand.Intn(iterationJitterPercent)
jitterDuration := time.Duration((int64(interval) * int64(delayPercent)) / 100)
j.Context.Logger.V(4).Infof("jitter (%d%%): %s", delayPercent, jitterDuration)

time.Sleep(jitterDuration)
j.Run()
})
entryID, err := cronRunner.AddJob(schedule, delayedJob)
entryID, err := cronRunner.AddJob(schedule, j)
if err != nil {
return fmt.Errorf("[%s] failed to schedule job: %s", j.Label(), err)
}
Expand Down
1 change: 1 addition & 0 deletions tests/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var _ = Describe("Job", Ordered, func() {
_ = context.UpdateProperty(ctx, "test.trace", "true")
_ = context.UpdateProperty(ctx, "test.db.level", "trace")
_ = context.UpdateProperty(ctx, "job.eviction.period", "1s")
_ = context.UpdateProperty(ctx, "job.jitter.disable", "true")

sampleJob.Run()
Expect(sampleJob.Retention.Success).To(Equal(1))
Expand Down

0 comments on commit 19f46d0

Please sign in to comment.