diff --git a/job/job.go b/job/job.go index f2c169a6..7b255177 100644 --- a/job/job.go +++ b/job/job.go @@ -6,9 +6,12 @@ import ( "fmt" "math/rand" "strconv" + "strings" "sync" "time" + "github.com/flanksource/commons/logger" + "github.com/flanksource/commons/text" "github.com/flanksource/duty/context" "github.com/flanksource/duty/echo" "github.com/flanksource/duty/models" @@ -18,6 +21,7 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/semaphore" + "gorm.io/gorm" ) const ( @@ -41,6 +45,7 @@ var evictedJobs chan uuid.UUID // jobs send rows to be deleted by maintaining a circular buffer by status type func deleteEvictedJobs(ctx context.Context) { period := ctx.Properties().Duration("job.eviction.period", time.Minute) + ctx = ctx.WithoutTracing().WithName("jobs").WithDBLogger("jobs", logger.Trace1) ctx.Infof("Cleaning up jobs every %v", period) for { items, _, _, _ := lo.BufferWithTimeout(evictedJobs, 32, 5*time.Second) @@ -48,7 +53,7 @@ func deleteEvictedJobs(ctx context.Context) { time.Sleep(period) continue } - if tx := ctx.FastDB("jobs").Exec("DELETE FROM job_history WHERE id in ?", items); tx.Error != nil { + if tx := ctx.DB().Exec("DELETE FROM job_history WHERE id in ?", items); tx.Error != nil { ctx.Errorf("Failed to delete job entries: %v", tx.Error) time.Sleep(1 * time.Minute) } else { @@ -117,6 +122,10 @@ func (j *Job) GetContext() map[string]any { } } +func (j *Job) PK() string { + return strings.TrimSuffix(strings.TrimSpace(fmt.Sprintf("%s/%s", j.Name, lo.CoalesceOrEmpty(j.ID, j.ResourceID))), "/") +} + type StatusRing struct { lock sync.Mutex rings map[string]*ring.Ring @@ -129,7 +138,7 @@ type StatusRing struct { // populateFromDB syncs the status ring with the existing job histories in db func (t *StatusRing) populateFromDB(ctx context.Context, name, resourceID string) error { var existingHistories []models.JobHistory - if err := ctx.FastDB("jobs").Where("name = ?", name).Where("resource_id = ?", resourceID).Order("time_start").Find(&existingHistories).Error; err != nil { + if err := ctx.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB().Where("name = ?", name).Where("resource_id = ?", resourceID).Order("time_start").Find(&existingHistories).Error; err != nil { return err } @@ -237,16 +246,20 @@ func (j *JobRuntime) start() { j.History.ResourceType = j.Job.ResourceType } if j.Job.JobHistory && j.Job.Retention.Success > 0 && !j.Job.IgnoreSuccessHistory { - if err := j.History.Persist(j.FastDB("jobs")); err != nil { + if err := j.History.Persist(j.VerboseDB()); err != nil { j.Warnf("failed to persist history: %v", err) } } } +func (j *JobRuntime) VerboseDB() *gorm.DB { + return j.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB() +} + func (j *JobRuntime) end() { j.History.End() if j.Job.JobHistory && (j.Job.Retention.Success > 0 || len(j.History.Errors) > 0) && !j.Job.IgnoreSuccessHistory { - if err := j.History.Persist(j.FastDB("jobs")); err != nil { + if err := j.History.Persist(j.VerboseDB()); err != nil { j.Warnf("failed to persist history: %v", err) } } @@ -285,9 +298,9 @@ func (j *Job) FindHistory(statuses ...string) ([]models.JobHistory, error) { var items []models.JobHistory var err error if len(statuses) == 0 { - err = j.FastDB("jobs").Where("name = ?", j.Name).Order("time_start DESC").Find(&items).Error + err = j.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB().Where("name = ?", j.Name).Order("time_start DESC").Find(&items).Error } else { - err = j.FastDB("jobs").Where("name = ? and status in ?", j.Name, statuses).Order("time_start DESC").Find(&items).Error + err = j.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB().Where("name = ? and status in ?", j.Name, statuses).Order("time_start DESC").Find(&items).Error } return items, err } @@ -309,6 +322,7 @@ func (j *Job) SetID(id string) *Job { func (j *Job) Run() { ctx, span := j.Context.StartSpan(j.Name) + ctx = ctx.WithName("job." + j.PK()) defer span.End() r := JobRuntime{ @@ -338,7 +352,7 @@ func (j *Job) Run() { if !j.lock.TryLock() { r.History.Status = models.StatusSkipped ctx.Tracef("failed to acquire lock") - r.Failf("%s job already running, skipping", r.ID()) + r.Failf("job already running, skipping") return } defer j.lock.Unlock() @@ -358,7 +372,7 @@ func (j *Job) Run() { delayPercent := rand.Intn(iterationJitterPercent) jitterDuration := time.Duration((int64(interval) * int64(delayPercent)) / 100) - j.Context.Logger.V(4).Infof("jitter (%d%%): %s", delayPercent, jitterDuration) + j.Context.Logger.V(4).Infof("jitter %v", jitterDuration) time.Sleep(jitterDuration) } @@ -385,10 +399,10 @@ func (j *Job) Run() { } if err := j.Fn(r); err != nil { - ctx.Tracef("finished duration=%s, error=%s", time.Since(r.History.TimeStart), err) + ctx.Tracef("finished duration=%s, error=%s", text.HumanizeDuration(time.Since(r.History.TimeStart)), err) r.History.AddErrorWithSkipReportLevel(err.Error(), 1) } else { - ctx.Tracef("finished duration=%s", time.Since(r.History.TimeStart)) + ctx.Tracef("finished duration=%s", text.HumanizeDuration(time.Since(r.History.TimeStart))) } } @@ -492,11 +506,11 @@ func (j *Job) init() error { } if j.ID != "" { - j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", j.Name, j.ID)) + j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", strings.ToLower(j.Name), j.ID)) } else if j.ResourceID != "" { - j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", j.Name, j.ResourceID)) + j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", strings.ToLower(j.Name), j.ResourceID)) } else { - j.Context = j.Context.WithName(j.Name) + j.Context = j.Context.WithName(strings.ToLower(j.Name)) } j.Context.Tracef("initalized %v", j.String())