Skip to content

Commit

Permalink
feat: status ring for push handler on upstream
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
adityathebe committed Aug 27, 2024
1 parent 7a37699 commit ec82d72
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 30 deletions.
8 changes: 4 additions & 4 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Job struct {
LastJob *models.JobHistory
initialized bool
unschedule func()
statusRing StatusRing
StatusRing StatusRing

// Semaphores control concurrent execution of related jobs.
// They are acquired sequentially and released in reverse order.
Expand Down Expand Up @@ -250,7 +250,7 @@ func (j *JobRuntime) end() {
j.Warnf("failed to persist history: %v", err)
}
}
j.Job.statusRing.Add(j.History)
j.Job.StatusRing.Add(j.History)

j.Context.Counter("job", "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Add(1)
j.Context.Histogram("job_duration", context.LongLatencyBuckets, "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Since(j.History.TimeStart)
Expand Down Expand Up @@ -501,8 +501,8 @@ func (j *Job) init() error {

j.Context.Tracef("initalized %v", j.String())

j.statusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs)
if err := j.statusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil {
j.StatusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs)
if err := j.StatusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil {
return fmt.Errorf("error populating status ring: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion tests/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, ginkgo.Label("slow"),
})

e.Use(upstream.AgentAuthMiddleware(cache.New(time.Hour, time.Hour)))
e.POST("/upstream/push", upstream.PushHandler)
e.POST("/upstream/push", upstream.NewPushHandler(nil))

port, echoCloser = setup.RunEcho(e)

Expand Down
82 changes: 57 additions & 25 deletions upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/api"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/labstack/echo/v4"
"github.com/patrickmn/go-cache"
"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -65,39 +67,69 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha
}

// PushHandler returns an echo handler that saves the push data from agents.
func PushHandler(c echo.Context) error {
ctx := c.Request().Context().(context.Context)
func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context().(context.Context)

start := time.Now()
histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "")
defer func() {
histogram.Since(start)
}()

var req PushData
err := json.NewDecoder(c.Request().Body).Decode(&req)
if err != nil {
histogram.Label(StatusLabel, StatusAgentError)
return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"})
}

start := time.Now()
histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "")
defer func() {
histogram.Since(start)
}()
ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count()))

var req PushData
err := json.NewDecoder(c.Request().Body).Decode(&req)
if err != nil {
histogram.Label(StatusLabel, StatusAgentError)
return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"})
}
agentID := ctx.Agent().ID
histogram = histogram.Label(AgentLabel, agentID.String())
req.PopulateAgentID(agentID)

ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count()))
ctx.Tracef("Inserting push data %s", req.String())

agentID := ctx.Agent().ID
histogram = histogram.Label(AgentLabel, agentID.String())
req.PopulateAgentID(agentID)
if err := InsertUpstreamMsg(ctx, &req); err != nil {
histogram.Label(StatusLabel, StatusError)
return api.WriteError(c, err)
}

ctx.Tracef("Inserting push data %s", req.String())
if err := addJobHistoryToRing(ctx, req.JobHistory, jobCrons); err != nil {
logger.Errorf("failed to add job histories to status ring: %v", err)
}

if err := InsertUpstreamMsg(ctx, &req); err != nil {
histogram.Label(StatusLabel, StatusError)
return api.WriteError(c, err)
histogram.Label(StatusLabel, StatusOK)
req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", ""))

if err := UpdateAgentLastReceived(ctx, agentID); err != nil {
logger.Errorf("failed to update agent last_received: %v", err)
}

return nil
}
histogram.Label(StatusLabel, StatusOK)
req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", ""))
}

if err := UpdateAgentLastReceived(ctx, agentID); err != nil {
logger.Errorf("failed to update agent last_received: %v", err)
func addJobHistoryToRing(ctx context.Context, histories []models.JobHistory, jobCron *cron.Cron) error {
if jobCron == nil {
return nil
}

jobMap := map[string]*job.StatusRing{}
for _, e := range jobCron.Entries() {
if j, ok := e.Job.(*job.Job); ok {
jobMap[j.Name] = &j.StatusRing
}
}

for _, history := range histories {
if j, ok := jobMap[history.Name]; ok {
j.Add(&history)
} else {
ctx.Warnf("job history %s didn't have a corresponding job entry", history.Name)
}
}

return nil
Expand Down

0 comments on commit ec82d72

Please sign in to comment.