Skip to content

Commit

Permalink
fix: Set initial progress from pod metadata if exists. Fixes #13057 (#…
Browse files Browse the repository at this point in the history
…13260)

Signed-off-by: jswxstw <[email protected]>
Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored Aug 12, 2024
1 parent a572e71 commit 2d7e2b5
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 20 deletions.
31 changes: 16 additions & 15 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.PodIP = pod.Status.PodIP
}

// If `AnnotationKeyReportOutputsCompleted` is set, it means RBAC prevented WorkflowTaskResult from being written.
if x, ok := pod.Annotations[common.AnnotationKeyReportOutputsCompleted]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
resultName := woc.nodeID(pod)
Expand All @@ -1429,16 +1430,23 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
} else {
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}
}

if x, ok := pod.Annotations[common.AnnotationKeyOutputs]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
// Get node outputs from pod annotations instead if RBAC prevented WorkflowTaskResult from being written.
if x, ok = pod.Annotations[common.AnnotationKeyOutputs]; ok {
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
}
if err := json.Unmarshal([]byte(x), new.Outputs); err != nil {
new.Phase = wfv1.NodeError
new.Message = err.Error()
}
}
if err := json.Unmarshal([]byte(x), new.Outputs); err != nil {
new.Phase = wfv1.NodeError
new.Message = err.Error()

// Get node progress from pod annotations instead if RBAC prevented WorkflowTaskResult from being written.
if x, ok = pod.Annotations[common.AnnotationKeyProgress]; ok {
if p, ok := wfv1.ParseProgress(x); ok {
new.Progress = p
}
}
}

Expand All @@ -1448,13 +1456,6 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.Progress = wfv1.ProgressDefault
}

if x, ok := pod.Annotations[common.AnnotationKeyProgress]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
if p, ok := wfv1.ParseProgress(x); ok {
new.Progress = p
}
}

// We capture the exit-code after we look for the task-result.
// All other outputs are set by the executor, only the exit-code is set by the controller.
// By waiting, we avoid breaking the race-condition check.
Expand Down
21 changes: 16 additions & 5 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ spec:
- name: pod
template: pod
- name: pod
metadata:
annotations:
workflows.argoproj.io/progress: 0/100
container:
image: my-image
`)
Expand All @@ -291,17 +294,25 @@ spec:
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

makePodsPhase(ctx, woc, apiv1.PodRunning, withProgress("50/100"))
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.Progress("0/100"), woc.wf.Status.Progress)
assert.Equal(t, wfv1.Progress("0/100"), woc.wf.Status.Nodes[woc.wf.Name].Progress)
pod := woc.wf.Status.Nodes.FindByDisplayName("pod")
assert.Equal(t, wfv1.Progress("0/100"), pod.Progress)

// mock workflow uses legacy/insecure pod patch
makePodsPhase(ctx, woc, apiv1.PodRunning, withAnnotation(common.AnnotationKeyReportOutputsCompleted, "false"), withProgress("50/100"))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.Progress("50/100"), woc.wf.Status.Progress)
assert.Equal(t, wfv1.Progress("50/100"), woc.wf.Status.Nodes[woc.wf.Name].Progress)
pod := woc.wf.Status.Nodes.FindByDisplayName("pod")
pod = woc.wf.Status.Nodes.FindByDisplayName("pod")
assert.Equal(t, wfv1.Progress("50/100"), pod.Progress)

makePodsPhase(ctx, woc, apiv1.PodSucceeded, withProgress("100/100"))
// mock workflow uses legacy/insecure pod patch
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withAnnotation(common.AnnotationKeyReportOutputsCompleted, "true"), withProgress("100/100"))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

Expand Down Expand Up @@ -6271,7 +6282,7 @@ func TestConfigMapCacheSaveOperate(t *testing.T) {

ctx := context.Background()
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withExitCode(0), withOutputs(wfv1.MustMarshallJSON(sampleOutputs)))
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withExitCode(0), withAnnotation(common.AnnotationKeyReportOutputsCompleted, "true"), withOutputs(wfv1.MustMarshallJSON(sampleOutputs)))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

Expand Down Expand Up @@ -6555,7 +6566,7 @@ spec:
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)

// make all created pods as successful
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withOutputs(`{"parameters": [{"name": "my-param"}]}`))
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withAnnotation(common.AnnotationKeyReportOutputsCompleted, "true"), withOutputs(`{"parameters": [{"name": "my-param"}]}`))

// reconcile
woc = newWorkflowOperationCtx(woc.wf, controller)
Expand Down
12 changes: 12 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
woc.addSchedulingConstraints(pod, wfSpec, tmpl, nodeName)
woc.addMetadata(pod, tmpl)

// Set initial progress from pod metadata if exists.
if x, ok := pod.ObjectMeta.Annotations[common.AnnotationKeyProgress]; ok {
if p, ok := wfv1.ParseProgress(x); ok {
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
woc.log.Panicf("was unable to obtain node for %s", nodeID)
}
node.Progress = p
woc.wf.Status.Nodes.Set(nodeID, *node)
}
}

err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
return nil, err
Expand Down

0 comments on commit 2d7e2b5

Please sign in to comment.