diff --git a/job/job.go b/job/job.go index f2c169a6..782150a6 100644 --- a/job/job.go +++ b/job/job.go @@ -97,7 +97,7 @@ type Job struct { LastJob *models.JobHistory initialized bool unschedule func() - statusRing StatusRing + StatusRing StatusRing // Semaphores control concurrent execution of related jobs. // They are acquired sequentially and released in reverse order. @@ -250,7 +250,7 @@ func (j *JobRuntime) end() { j.Warnf("failed to persist history: %v", err) } } - j.Job.statusRing.Add(j.History) + j.Job.StatusRing.Add(j.History) j.Context.Counter("job", "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Add(1) j.Context.Histogram("job_duration", context.LongLatencyBuckets, "name", j.Job.Name, "id", j.Job.ResourceID, "resource", j.Job.ResourceType, "status", j.History.Status).Since(j.History.TimeStart) @@ -501,8 +501,8 @@ func (j *Job) init() error { j.Context.Tracef("initalized %v", j.String()) - j.statusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs) - if err := j.statusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil { + j.StatusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs) + if err := j.StatusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil { return fmt.Errorf("error populating status ring: %w", err) } diff --git a/tests/upstream_test.go b/tests/upstream_test.go index aea42880..d2addd6c 100644 --- a/tests/upstream_test.go +++ b/tests/upstream_test.go @@ -56,7 +56,7 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, ginkgo.Label("slow"), }) e.Use(upstream.AgentAuthMiddleware(cache.New(time.Hour, time.Hour))) - e.POST("/upstream/push", upstream.PushHandler) + e.POST("/upstream/push", upstream.NewPushHandler(nil)) port, echoCloser = setup.RunEcho(e) diff --git a/upstream/controllers.go b/upstream/controllers.go index 667eccc5..26ea5d38 100644 --- a/upstream/controllers.go +++ b/upstream/controllers.go @@ -9,9 +9,11 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty/api" "github.com/flanksource/duty/context" + "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/labstack/echo/v4" "github.com/patrickmn/go-cache" + "github.com/robfig/cron/v3" "go.opentelemetry.io/otel/attribute" ) @@ -65,39 +67,69 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha } // PushHandler returns an echo handler that saves the push data from agents. -func PushHandler(c echo.Context) error { - ctx := c.Request().Context().(context.Context) +func NewPushHandler(jobCrons *cron.Cron) echo.HandlerFunc { + return func(c echo.Context) error { + ctx := c.Request().Context().(context.Context) + + start := time.Now() + histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "") + defer func() { + histogram.Since(start) + }() + + var req PushData + err := json.NewDecoder(c.Request().Body).Decode(&req) + if err != nil { + histogram.Label(StatusLabel, StatusAgentError) + return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"}) + } - start := time.Now() - histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "") - defer func() { - histogram.Since(start) - }() + ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count())) - var req PushData - err := json.NewDecoder(c.Request().Body).Decode(&req) - if err != nil { - histogram.Label(StatusLabel, StatusAgentError) - return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"}) - } + agentID := ctx.Agent().ID + histogram = histogram.Label(AgentLabel, agentID.String()) + req.PopulateAgentID(agentID) - ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count())) + ctx.Tracef("Inserting push data %s", req.String()) - agentID := ctx.Agent().ID - histogram = histogram.Label(AgentLabel, agentID.String()) - req.PopulateAgentID(agentID) + if err := InsertUpstreamMsg(ctx, &req); err != nil { + histogram.Label(StatusLabel, StatusError) + return api.WriteError(c, err) + } - ctx.Tracef("Inserting push data %s", req.String()) + if err := addJobHistoryToRing(ctx, req.JobHistory, jobCrons); err != nil { + logger.Errorf("failed to add job histories to status ring: %v", err) + } - if err := InsertUpstreamMsg(ctx, &req); err != nil { - histogram.Label(StatusLabel, StatusError) - return api.WriteError(c, err) + histogram.Label(StatusLabel, StatusOK) + req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", "")) + + if err := UpdateAgentLastReceived(ctx, agentID); err != nil { + logger.Errorf("failed to update agent last_received: %v", err) + } + + return nil } - histogram.Label(StatusLabel, StatusOK) - req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", "")) +} - if err := UpdateAgentLastReceived(ctx, agentID); err != nil { - logger.Errorf("failed to update agent last_received: %v", err) +func addJobHistoryToRing(ctx context.Context, histories []models.JobHistory, jobCron *cron.Cron) error { + if jobCron == nil { + return nil + } + + jobMap := map[string]*job.StatusRing{} + for _, e := range jobCron.Entries() { + if j, ok := e.Job.(*job.Job); ok { + jobMap[j.Name] = &j.StatusRing + } + } + + for _, history := range histories { + if j, ok := jobMap[history.Name]; ok { + j.Add(&history) + } else { + ctx.Warnf("job history %s didn't have a corresponding job entry", history.Name) + } } return nil