diff --git a/job/job.go b/job/job.go index 782150a6..ec12a086 100644 --- a/job/job.go +++ b/job/job.go @@ -35,7 +35,7 @@ const ( maxJitterDuration = time.Minute * 15 ) -var evictedJobs chan uuid.UUID +var EvictedJobs chan uuid.UUID // deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m), // jobs send rows to be deleted by maintaining a circular buffer by status type @@ -43,7 +43,7 @@ func deleteEvictedJobs(ctx context.Context) { period := ctx.Properties().Duration("job.eviction.period", time.Minute) ctx.Infof("Cleaning up jobs every %v", period) for { - items, _, _, _ := lo.BufferWithTimeout(evictedJobs, 32, 5*time.Second) + items, _, _, _ := lo.BufferWithTimeout(EvictedJobs, 32, 5*time.Second) if len(items) == 0 { time.Sleep(period) continue @@ -97,7 +97,7 @@ type Job struct { LastJob *models.JobHistory initialized bool unschedule func() - StatusRing StatusRing + statusRing StatusRing // Semaphores control concurrent execution of related jobs. // They are acquired sequentially and released in reverse order. @@ -140,7 +140,7 @@ func (t *StatusRing) populateFromDB(ctx context.Context, name, resourceID string return nil } -func newStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing { +func NewStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing { return StatusRing{ lock: sync.Mutex{}, retention: r, @@ -179,9 +179,6 @@ type Retention struct { // Failed is the number of unsuccessful job history to retain Failed int - - // Data ...? - Data bool } func (r Retention) Count(status string) int { @@ -191,11 +188,6 @@ func (r Retention) Count(status string) int { return r.Success } -func (r Retention) WithData() Retention { - r.Data = true - return r -} - func (r Retention) String() string { return fmt.Sprintf("success=%d, failed=%d", r.Success, r.Failed) } @@ -250,7 +242,7 @@ func (j *JobRuntime) end() { j.Warnf("failed to persist history: %v", err) } } - j.Job.StatusRing.Add(j.History) + j.Job.statusRing.Add(j.History) j.Context.Counter("job", "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Add(1) j.Context.Histogram("job_duration", context.LongLatencyBuckets, "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Since(j.History.TimeStart) @@ -429,8 +421,8 @@ func (j *Job) GetPropertyInt(property string, def int) int { } func (j *Job) init() error { - if evictedJobs == nil { - evictedJobs = make(chan uuid.UUID, 1000) + if EvictedJobs == nil { + EvictedJobs = make(chan uuid.UUID, 1000) go deleteEvictedJobs(j.Context) } @@ -501,8 +493,8 @@ func (j *Job) init() error { j.Context.Tracef("initalized %v", j.String()) - j.StatusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs) - if err := j.StatusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil { + j.statusRing = NewStatusRing(j.Retention, j.Singleton, EvictedJobs) + if err := j.statusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil { return fmt.Errorf("error populating status ring: %w", err) } diff --git a/job/job_test.go b/job/job_test.go index 8a369b7d..79f4c0d0 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -56,7 +56,7 @@ var _ = Describe("StatusRing", Label("slow"), func() { for i := range cases { td := cases[i] eg.Go(func() error { - sr := newStatusRing(td, false, ch) + sr := NewStatusRing(td, false, ch) for i := 0; i < loops; i++ { sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusSuccess)}) sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusFailed)}) diff --git a/upstream/agent_status_ring.go b/upstream/agent_status_ring.go new file mode 100644 index 00000000..be9da105 --- /dev/null +++ b/upstream/agent_status_ring.go @@ -0,0 +1,62 @@ +package upstream + +import ( + "fmt" + "sync" + + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/job" + "github.com/flanksource/duty/models" + "github.com/google/uuid" +) + +// StatusRingManager manages status rings for agent jobs. +type StatusRingManager interface { + // Add adds the given history to the corresponding status ring of the agent. + // if the status ring doesn't exist then it creates a new one. + Add(ctx context.Context, agentID string, history models.JobHistory) +} + +type simpleStatusRingManager struct { + m sync.Mutex + evicted chan uuid.UUID + statusRings map[string]*job.StatusRing +} + +func (t *simpleStatusRingManager) Add(ctx context.Context, agentID string, history models.JobHistory) { + ring := t.getOrCreateRing(ctx, agentID, history) + ring.Add(&history) +} + +func (t *simpleStatusRingManager) key(agentID string, history models.JobHistory) string { + return fmt.Sprintf("%s-%s-%s", agentID, history.Name, history.ResourceID) +} + +func (t *simpleStatusRingManager) getOrCreateRing(ctx context.Context, agentID string, history models.JobHistory) *job.StatusRing { + t.m.Lock() + defer t.m.Unlock() + + key := t.key(agentID, history) + if ring, ok := t.statusRings[key]; ok { + return ring + } + + // By default use a balanced retention + retention := job.RetentionBalanced + + // Use retention from the properties if available + dummyJob := job.NewJob(ctx, history.Name, "", nil) + retention.Success = dummyJob.GetPropertyInt("retention.success", retention.Success) + retention.Failed = dummyJob.GetPropertyInt("retention.failed", retention.Failed) + + ring := job.NewStatusRing(retention, false, t.evicted) + t.statusRings[key] = &ring + return &ring +} + +func NewStatusRingStore(evicted chan uuid.UUID) StatusRingManager { + return &simpleStatusRingManager{ + evicted: evicted, + statusRings: make(map[string]*job.StatusRing), + } +} diff --git a/upstream/controllers.go b/upstream/controllers.go index 26ea5d38..76338f31 100644 --- a/upstream/controllers.go +++ b/upstream/controllers.go @@ -9,11 +9,9 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty/api" "github.com/flanksource/duty/context" - "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/labstack/echo/v4" "github.com/patrickmn/go-cache" - "github.com/robfig/cron/v3" "go.opentelemetry.io/otel/attribute" ) @@ -66,8 +64,8 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha } } -// PushHandler returns an echo handler that saves the push data from agents. -func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc { +// NewPushHandler returns an echo handler that saves the push data from agents. +func NewPushHandler(ringManager StatusRingManager) echo.HandlerFunc { return func(c echo.Context) error { ctx := c.Request().Context().(context.Context) @@ -97,9 +95,7 @@ func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc { return api.WriteError(c, err) } - if err := addJobHistoryToRing(ctx, req.JobHistory, jobCrons); err != nil { - logger.Errorf("failed to add job histories to status ring: %v", err) - } + addJobHistoryToRing(ctx, agentID.String(), req.JobHistory, ringManager) histogram.Label(StatusLabel, StatusOK) req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", "")) @@ -112,27 +108,14 @@ func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc { } } -func addJobHistoryToRing(ctx context.Context, histories []models.JobHistory, jobCron *cron.Cron) error { - if jobCron == nil { - return nil - } - - jobMap := map[string]*job.StatusRing{} - for _, e := range jobCron.Entries() { - if j, ok := e.Job.(*job.Job); ok { - jobMap[j.Name] = &j.StatusRing - } +func addJobHistoryToRing(ctx context.Context, agentID string, histories []models.JobHistory, ringManager StatusRingManager) { + if ringManager == nil { + return } for _, history := range histories { - if j, ok := jobMap[history.Name]; ok { - j.Add(&history) - } else { - ctx.Warnf("job history %s didn't have a corresponding job entry", history.Name) - } + ringManager.Add(ctx, agentID, history) } - - return nil } // PushHandler returns an echo handler that deletes the push data from the upstream.