Skip to content

Commit

Permalink
feat: report status for AgentTask
Browse files Browse the repository at this point in the history
  • Loading branch information
masontikhonov committed Dec 1, 2024
1 parent d9dec10 commit e5be91e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
29 changes: 22 additions & 7 deletions venona/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (a *Agent) startTaskPullerRoutine(ctx context.Context) {

// perform all agentTasks (in goroutine)
for i := range agentTasks {
a.handleAgentTask(&agentTasks[i])
a.handleAgentTask(ctx, &agentTasks[i])
}

// send all wfTasks to tasksQueue
Expand Down Expand Up @@ -242,10 +242,22 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus)
}
}

func (a *Agent) reportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) {
err := a.cf.ReportTaskStatus(ctx, id, status)
func (a *Agent) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) {
status := task.TaskStatus{
OccurredAt: time.Now(),
StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1,
}
if err != nil {
a.log.Error("Failed reporting task status", "error", err)
status.Status = task.StatusError
status.Reason = err.Error()
status.IsRetriable = true // TODO: make this configurable depending on the error
} else {
status.Status = task.StatusSuccess
}

statusErr := a.cf.ReportTaskStatus(ctx, taskDef.Id, status)
if statusErr != nil {
a.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId)
}
}

Expand Down Expand Up @@ -321,14 +333,14 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
return agentTasks, workflows
}

func (a *Agent) handleAgentTask(t *task.Task) {
func (a *Agent) handleAgentTask(ctx context.Context, t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId)
a.wg.Add(1)
go func() {
defer a.wg.Done()
txn := task.NewTaskTransaction(a.monitor, t.Metadata)
defer txn.End()
err := a.executeAgentTask(t)
err := a.executeAgentTask(ctx, t)

if err != nil {
a.log.Error(err.Error())
Expand All @@ -338,7 +350,7 @@ func (a *Agent) handleAgentTask(t *task.Task) {
}()
}

func (a *Agent) executeAgentTask(t *task.Task) error {
func (a *Agent) executeAgentTask(ctx context.Context, t *task.Task) error {
t.Timeline.Started = time.Now()
specJSON, err := json.Marshal(t.Spec)
if err != nil {
Expand All @@ -356,6 +368,9 @@ func (a *Agent) executeAgentTask(t *task.Task) error {
}

err = e(&spec, a.log)
if t.Metadata.ShouldReportStatus {
a.reportTaskStatus(ctx, *t, err)
}
sinceCreation, inRunner, processed := t.GetLatency()
a.log.Info("Done handling agent task",
"tid", t.Metadata.WorkflowId,
Expand Down
34 changes: 21 additions & 13 deletions venona/pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,12 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo
for i := range wf.Tasks {
taskDef := wf.Tasks[i]
err := runtime.HandleTask(ctx, taskDef)
status := task.TaskStatus{
OccurredAt: time.Now(),
StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1,
}
if err != nil {
wfq.log.Error("failed handling task", "error", err, "workflow", workflow)
txn.NoticeError(errRuntimeNotFound)
status.Status = task.StatusError
status.Reason = err.Error()
status.IsRetriable = true // TODO: make this configurable depending on the error
} else {
status.Status = task.StatusSuccess
}
statusErr := wfq.cf.ReportTaskStatus(ctx, taskDef.Id, status)
if statusErr != nil {
wfq.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", workflow)
txn.NoticeError(statusErr)
if taskDef.Metadata.ShouldReportStatus {
wfq.reportTaskStatus(ctx, *taskDef, err)
}
}

Expand All @@ -195,3 +184,22 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo
)
metrics.ObserveWorkflowMetrics(wf.Type, sinceCreation, inRunner, processed)
}

func (wfq *wfQueueImpl) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) {
status := task.TaskStatus{
OccurredAt: time.Now(),
StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1,
}
if err != nil {
status.Status = task.StatusError
status.Reason = err.Error()
status.IsRetriable = true // TODO: make this configurable depending on the error
} else {
status.Status = task.StatusSuccess
}

statusErr := wfq.cf.ReportTaskStatus(ctx, taskDef.Id, status)
if statusErr != nil {
wfq.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId)
}
}
3 changes: 2 additions & 1 deletion venona/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type (
ReName string `json:"reName"`
WorkflowId string `json:"workflowId"`
CurrentStatusRevision int `json:"currentStatusRevision"`
ShouldReportStatus bool `json:"shouldReportStatus"`
}

// Timeline values
Expand All @@ -85,7 +86,7 @@ type (
OccurredAt time.Time `json:"occurredAt"`
StatusRevision int `json:"statusRevision"`
IsRetriable bool `json:"isRetriable"`
Reason string `json:"reason"`
Reason string `json:"reason,omitempty"`
}
)

Expand Down

0 comments on commit e5be91e

Please sign in to comment.