Skip to content

Commit

Permalink
feat: status ring manager
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Aug 27, 2024
1 parent ec82d72 commit 2750a6e
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 42 deletions.
26 changes: 9 additions & 17 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ const (
maxJitterDuration = time.Minute * 15
)

var evictedJobs chan uuid.UUID
var EvictedJobs chan uuid.UUID

// deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m),
// jobs send rows to be deleted by maintaining a circular buffer by status type
func deleteEvictedJobs(ctx context.Context) {
period := ctx.Properties().Duration("job.eviction.period", time.Minute)
ctx.Infof("Cleaning up jobs every %v", period)
for {
items, _, _, _ := lo.BufferWithTimeout(evictedJobs, 32, 5*time.Second)
items, _, _, _ := lo.BufferWithTimeout(EvictedJobs, 32, 5*time.Second)
if len(items) == 0 {
time.Sleep(period)
continue
Expand Down Expand Up @@ -97,7 +97,7 @@ type Job struct {
LastJob *models.JobHistory
initialized bool
unschedule func()
StatusRing StatusRing
statusRing StatusRing

// Semaphores control concurrent execution of related jobs.
// They are acquired sequentially and released in reverse order.
Expand Down Expand Up @@ -140,7 +140,7 @@ func (t *StatusRing) populateFromDB(ctx context.Context, name, resourceID string
return nil
}

func newStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing {
func NewStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing {
return StatusRing{
lock: sync.Mutex{},
retention: r,
Expand Down Expand Up @@ -179,9 +179,6 @@ type Retention struct {

// Failed is the number of unsuccessful job history to retain
Failed int

// Data ...?
Data bool
}

func (r Retention) Count(status string) int {
Expand All @@ -191,11 +188,6 @@ func (r Retention) Count(status string) int {
return r.Success
}

func (r Retention) WithData() Retention {
r.Data = true
return r
}

func (r Retention) String() string {
return fmt.Sprintf("success=%d, failed=%d", r.Success, r.Failed)
}
Expand Down Expand Up @@ -250,7 +242,7 @@ func (j *JobRuntime) end() {
j.Warnf("failed to persist history: %v", err)
}
}
j.Job.StatusRing.Add(j.History)
j.Job.statusRing.Add(j.History)

j.Context.Counter("job", "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Add(1)
j.Context.Histogram("job_duration", context.LongLatencyBuckets, "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Since(j.History.TimeStart)
Expand Down Expand Up @@ -429,8 +421,8 @@ func (j *Job) GetPropertyInt(property string, def int) int {
}

func (j *Job) init() error {
if evictedJobs == nil {
evictedJobs = make(chan uuid.UUID, 1000)
if EvictedJobs == nil {
EvictedJobs = make(chan uuid.UUID, 1000)
go deleteEvictedJobs(j.Context)
}

Expand Down Expand Up @@ -501,8 +493,8 @@ func (j *Job) init() error {

j.Context.Tracef("initalized %v", j.String())

j.StatusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs)
if err := j.StatusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil {
j.statusRing = NewStatusRing(j.Retention, j.Singleton, EvictedJobs)
if err := j.statusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil {
return fmt.Errorf("error populating status ring: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("StatusRing", Label("slow"), func() {
for i := range cases {
td := cases[i]
eg.Go(func() error {
sr := newStatusRing(td, false, ch)
sr := NewStatusRing(td, false, ch)
for i := 0; i < loops; i++ {
sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusSuccess)})
sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusFailed)})
Expand Down
62 changes: 62 additions & 0 deletions upstream/agent_status_ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package upstream

import (
"fmt"
"sync"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
)

// StatusRingManager manages status rings for agent jobs.
type StatusRingManager interface {
// Add adds the given history to the corresponding status ring of the agent.
// if the status ring doesn't exist then it creates a new one.
Add(ctx context.Context, agentID string, history models.JobHistory)
}

type simpleStatusRingManager struct {
m sync.Mutex
evicted chan uuid.UUID
statusRings map[string]*job.StatusRing
}

func (t *simpleStatusRingManager) Add(ctx context.Context, agentID string, history models.JobHistory) {
ring := t.getOrCreateRing(ctx, agentID, history)
ring.Add(&history)
}

func (t *simpleStatusRingManager) key(agentID string, history models.JobHistory) string {
return fmt.Sprintf("%s-%s-%s", agentID, history.Name, history.ResourceID)
}

func (t *simpleStatusRingManager) getOrCreateRing(ctx context.Context, agentID string, history models.JobHistory) *job.StatusRing {
t.m.Lock()
defer t.m.Unlock()

key := t.key(agentID, history)
if ring, ok := t.statusRings[key]; ok {
return ring
}

// By default use a balanced retention
retention := job.RetentionBalanced

// Use retention from the properties if available
dummyJob := job.NewJob(ctx, history.Name, "", nil)
retention.Success = dummyJob.GetPropertyInt("retention.success", retention.Success)
retention.Failed = dummyJob.GetPropertyInt("retention.failed", retention.Failed)

ring := job.NewStatusRing(retention, false, t.evicted)
t.statusRings[key] = &ring
return &ring
}

func NewStatusRingStore(evicted chan uuid.UUID) StatusRingManager {
return &simpleStatusRingManager{
evicted: evicted,
statusRings: make(map[string]*job.StatusRing),
}
}
31 changes: 7 additions & 24 deletions upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/api"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/labstack/echo/v4"
"github.com/patrickmn/go-cache"
"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -66,8 +64,8 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha
}
}

// PushHandler returns an echo handler that saves the push data from agents.
func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc {
// NewPushHandler returns an echo handler that saves the push data from agents.
func NewPushHandler(ringManager StatusRingManager) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context().(context.Context)

Expand Down Expand Up @@ -97,9 +95,7 @@ func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc {
return api.WriteError(c, err)
}

if err := addJobHistoryToRing(ctx, req.JobHistory, jobCrons); err != nil {
logger.Errorf("failed to add job histories to status ring: %v", err)
}
addJobHistoryToRing(ctx, agentID.String(), req.JobHistory, ringManager)

histogram.Label(StatusLabel, StatusOK)
req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", ""))
Expand All @@ -112,27 +108,14 @@ func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc {
}
}

func addJobHistoryToRing(ctx context.Context, histories []models.JobHistory, jobCron *cron.Cron) error {
if jobCron == nil {
return nil
}

jobMap := map[string]*job.StatusRing{}
for _, e := range jobCron.Entries() {
if j, ok := e.Job.(*job.Job); ok {
jobMap[j.Name] = &j.StatusRing
}
func addJobHistoryToRing(ctx context.Context, agentID string, histories []models.JobHistory, ringManager StatusRingManager) {
if ringManager == nil {
return
}

for _, history := range histories {
if j, ok := jobMap[history.Name]; ok {
j.Add(&history)
} else {
ctx.Warnf("job history %s didn't have a corresponding job entry", history.Name)
}
ringManager.Add(ctx, agentID, history)
}

return nil
}

// PushHandler returns an echo handler that deletes the push data from the upstream.
Expand Down

0 comments on commit 2750a6e

Please sign in to comment.