Skip to content

Commit

Permalink
chore: job logging improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop committed Aug 27, 2024
1 parent bc54039 commit 5c2233f
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/commons/text"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/echo"
"github.com/flanksource/duty/models"
Expand All @@ -18,6 +21,7 @@ import (
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
"gorm.io/gorm"
)

const (
Expand All @@ -41,14 +45,15 @@ var evictedJobs chan uuid.UUID
// jobs send rows to be deleted by maintaining a circular buffer by status type
func deleteEvictedJobs(ctx context.Context) {
period := ctx.Properties().Duration("job.eviction.period", time.Minute)
ctx = ctx.WithoutTracing().WithName("jobs").WithDBLogger("jobs", logger.Trace1)
ctx.Infof("Cleaning up jobs every %v", period)
for {
items, _, _, _ := lo.BufferWithTimeout(evictedJobs, 32, 5*time.Second)
if len(items) == 0 {
time.Sleep(period)
continue
}
if tx := ctx.FastDB("jobs").Exec("DELETE FROM job_history WHERE id in ?", items); tx.Error != nil {
if tx := ctx.DB().Exec("DELETE FROM job_history WHERE id in ?", items); tx.Error != nil {
ctx.Errorf("Failed to delete job entries: %v", tx.Error)
time.Sleep(1 * time.Minute)
} else {
Expand Down Expand Up @@ -117,6 +122,10 @@ func (j *Job) GetContext() map[string]any {
}
}

func (j *Job) PK() string {
return strings.TrimSuffix(strings.TrimSpace(fmt.Sprintf("%s/%s", j.Name, lo.CoalesceOrEmpty(j.ID, j.ResourceID))), "/")
}

type StatusRing struct {
lock sync.Mutex
rings map[string]*ring.Ring
Expand All @@ -129,7 +138,7 @@ type StatusRing struct {
// populateFromDB syncs the status ring with the existing job histories in db
func (t *StatusRing) populateFromDB(ctx context.Context, name, resourceID string) error {
var existingHistories []models.JobHistory
if err := ctx.FastDB("jobs").Where("name = ?", name).Where("resource_id = ?", resourceID).Order("time_start").Find(&existingHistories).Error; err != nil {
if err := ctx.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB().Where("name = ?", name).Where("resource_id = ?", resourceID).Order("time_start").Find(&existingHistories).Error; err != nil {
return err
}

Expand Down Expand Up @@ -237,16 +246,20 @@ func (j *JobRuntime) start() {
j.History.ResourceType = j.Job.ResourceType
}
if j.Job.JobHistory && j.Job.Retention.Success > 0 && !j.Job.IgnoreSuccessHistory {
if err := j.History.Persist(j.FastDB("jobs")); err != nil {
if err := j.History.Persist(j.VerboseDB()); err != nil {
j.Warnf("failed to persist history: %v", err)
}
}
}

func (j *JobRuntime) VerboseDB() *gorm.DB {
return j.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB()
}

func (j *JobRuntime) end() {
j.History.End()
if j.Job.JobHistory && (j.Job.Retention.Success > 0 || len(j.History.Errors) > 0) && !j.Job.IgnoreSuccessHistory {
if err := j.History.Persist(j.FastDB("jobs")); err != nil {
if err := j.History.Persist(j.VerboseDB()); err != nil {
j.Warnf("failed to persist history: %v", err)
}
}
Expand Down Expand Up @@ -285,9 +298,9 @@ func (j *Job) FindHistory(statuses ...string) ([]models.JobHistory, error) {
var items []models.JobHistory
var err error
if len(statuses) == 0 {
err = j.FastDB("jobs").Where("name = ?", j.Name).Order("time_start DESC").Find(&items).Error
err = j.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB().Where("name = ?", j.Name).Order("time_start DESC").Find(&items).Error
} else {
err = j.FastDB("jobs").Where("name = ? and status in ?", j.Name, statuses).Order("time_start DESC").Find(&items).Error
err = j.WithoutTracing().WithDBLogger("jobs", logger.Trace1).DB().Where("name = ? and status in ?", j.Name, statuses).Order("time_start DESC").Find(&items).Error
}
return items, err
}
Expand All @@ -309,6 +322,7 @@ func (j *Job) SetID(id string) *Job {

func (j *Job) Run() {
ctx, span := j.Context.StartSpan(j.Name)
ctx = ctx.WithName("job." + j.PK())
defer span.End()

r := JobRuntime{
Expand Down Expand Up @@ -338,7 +352,7 @@ func (j *Job) Run() {
if !j.lock.TryLock() {
r.History.Status = models.StatusSkipped
ctx.Tracef("failed to acquire lock")
r.Failf("%s job already running, skipping", r.ID())
r.Failf("job already running, skipping")
return
}
defer j.lock.Unlock()
Expand All @@ -358,7 +372,7 @@ func (j *Job) Run() {

delayPercent := rand.Intn(iterationJitterPercent)
jitterDuration := time.Duration((int64(interval) * int64(delayPercent)) / 100)
j.Context.Logger.V(4).Infof("jitter (%d%%): %s", delayPercent, jitterDuration)
j.Context.Logger.V(4).Infof("jitter %v", jitterDuration)

time.Sleep(jitterDuration)
}
Expand All @@ -385,10 +399,10 @@ func (j *Job) Run() {
}

if err := j.Fn(r); err != nil {
ctx.Tracef("finished duration=%s, error=%s", time.Since(r.History.TimeStart), err)
ctx.Tracef("finished duration=%s, error=%s", text.HumanizeDuration(time.Since(r.History.TimeStart)), err)
r.History.AddErrorWithSkipReportLevel(err.Error(), 1)
} else {
ctx.Tracef("finished duration=%s", time.Since(r.History.TimeStart))
ctx.Tracef("finished duration=%s", text.HumanizeDuration(time.Since(r.History.TimeStart)))
}
}

Expand Down Expand Up @@ -492,11 +506,11 @@ func (j *Job) init() error {
}

if j.ID != "" {
j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", j.Name, j.ID))
j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", strings.ToLower(j.Name), j.ID))
} else if j.ResourceID != "" {
j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", j.Name, j.ResourceID))
j.Context = j.Context.WithName(fmt.Sprintf("%s.%s", strings.ToLower(j.Name), j.ResourceID))
} else {
j.Context = j.Context.WithName(j.Name)
j.Context = j.Context.WithName(strings.ToLower(j.Name))
}

j.Context.Tracef("initalized %v", j.String())
Expand Down

0 comments on commit 5c2233f

Please sign in to comment.