From e5be91ed75c57419797af6c3c7d1d1985ddf5baa Mon Sep 17 00:00:00 2001 From: Zhenya Tikhonov Date: Sun, 1 Dec 2024 17:36:57 +0400 Subject: [PATCH] feat: report status for `AgentTask` --- venona/pkg/agent/agent.go | 29 ++++++++++++++++++++++------- venona/pkg/queue/queue.go | 34 +++++++++++++++++++++------------- venona/pkg/task/task.go | 3 ++- 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/venona/pkg/agent/agent.go b/venona/pkg/agent/agent.go index cbf88c57..6956b877 100644 --- a/venona/pkg/agent/agent.go +++ b/venona/pkg/agent/agent.go @@ -193,7 +193,7 @@ func (a *Agent) startTaskPullerRoutine(ctx context.Context) { // perform all agentTasks (in goroutine) for i := range agentTasks { - a.handleAgentTask(&agentTasks[i]) + a.handleAgentTask(ctx, &agentTasks[i]) } // send all wfTasks to tasksQueue @@ -242,10 +242,22 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus) } } -func (a *Agent) reportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) { - err := a.cf.ReportTaskStatus(ctx, id, status) +func (a *Agent) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) { + status := task.TaskStatus{ + OccurredAt: time.Now(), + StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1, + } if err != nil { - a.log.Error("Failed reporting task status", "error", err) + status.Status = task.StatusError + status.Reason = err.Error() + status.IsRetriable = true // TODO: make this configurable depending on the error + } else { + status.Status = task.StatusSuccess + } + + statusErr := a.cf.ReportTaskStatus(ctx, taskDef.Id, status) + if statusErr != nil { + a.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId) } } @@ -321,14 +333,14 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow) return agentTasks, workflows } -func (a *Agent) handleAgentTask(t *task.Task) { +func (a *Agent) handleAgentTask(ctx context.Context, t *task.Task) { a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId) a.wg.Add(1) go func() { defer a.wg.Done() txn := task.NewTaskTransaction(a.monitor, t.Metadata) defer txn.End() - err := a.executeAgentTask(t) + err := a.executeAgentTask(ctx, t) if err != nil { a.log.Error(err.Error()) @@ -338,7 +350,7 @@ func (a *Agent) handleAgentTask(t *task.Task) { }() } -func (a *Agent) executeAgentTask(t *task.Task) error { +func (a *Agent) executeAgentTask(ctx context.Context, t *task.Task) error { t.Timeline.Started = time.Now() specJSON, err := json.Marshal(t.Spec) if err != nil { @@ -356,6 +368,9 @@ func (a *Agent) executeAgentTask(t *task.Task) error { } err = e(&spec, a.log) + if t.Metadata.ShouldReportStatus { + a.reportTaskStatus(ctx, *t, err) + } sinceCreation, inRunner, processed := t.GetLatency() a.log.Info("Done handling agent task", "tid", t.Metadata.WorkflowId, diff --git a/venona/pkg/queue/queue.go b/venona/pkg/queue/queue.go index 046cba6d..595f5827 100644 --- a/venona/pkg/queue/queue.go +++ b/venona/pkg/queue/queue.go @@ -165,23 +165,12 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo for i := range wf.Tasks { taskDef := wf.Tasks[i] err := runtime.HandleTask(ctx, taskDef) - status := task.TaskStatus{ - OccurredAt: time.Now(), - StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1, - } if err != nil { wfq.log.Error("failed handling task", "error", err, "workflow", workflow) txn.NoticeError(errRuntimeNotFound) - status.Status = task.StatusError - status.Reason = err.Error() - status.IsRetriable = true // TODO: make this configurable depending on the error - } else { - status.Status = task.StatusSuccess } - statusErr := wfq.cf.ReportTaskStatus(ctx, taskDef.Id, status) - if statusErr != nil { - wfq.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", workflow) - txn.NoticeError(statusErr) + if taskDef.Metadata.ShouldReportStatus { + wfq.reportTaskStatus(ctx, *taskDef, err) } } @@ -195,3 +184,22 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo ) metrics.ObserveWorkflowMetrics(wf.Type, sinceCreation, inRunner, processed) } + +func (wfq *wfQueueImpl) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) { + status := task.TaskStatus{ + OccurredAt: time.Now(), + StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1, + } + if err != nil { + status.Status = task.StatusError + status.Reason = err.Error() + status.IsRetriable = true // TODO: make this configurable depending on the error + } else { + status.Status = task.StatusSuccess + } + + statusErr := wfq.cf.ReportTaskStatus(ctx, taskDef.Id, status) + if statusErr != nil { + wfq.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId) + } +} diff --git a/venona/pkg/task/task.go b/venona/pkg/task/task.go index 10ebf67b..4c2ad6a9 100644 --- a/venona/pkg/task/task.go +++ b/venona/pkg/task/task.go @@ -66,6 +66,7 @@ type ( ReName string `json:"reName"` WorkflowId string `json:"workflowId"` CurrentStatusRevision int `json:"currentStatusRevision"` + ShouldReportStatus bool `json:"shouldReportStatus"` } // Timeline values @@ -85,7 +86,7 @@ type ( OccurredAt time.Time `json:"occurredAt"` StatusRevision int `json:"statusRevision"` IsRetriable bool `json:"isRetriable"` - Reason string `json:"reason"` + Reason string `json:"reason,omitempty"` } )