From dd2135add9dee07f6a2cac56c8992004e3a6a88a Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Mon, 2 Oct 2023 21:41:21 +0100 Subject: [PATCH] Regression: fix results with out of order tasks The pipeline run reconciler builds a pipeline run state on every run, which resolves task references, expands result and processes matrix fan outs. The current process is incremental in a single loop, where each new PipelineTask resolution depends on the state of PipelineTasks resolved before. This is problematic because tasks are not necessarily defined in the pipeline in order of execution (which is undefined, given that pipelines are DAGs). Since this PR is a fix to a regression, it aims to be as minimal as possible. The smallest solution available is to implement some sorting in the list of tasks, so that the incremental state can work correctly. This PR splits the process into two runs, one for tasks that have been already started (and possibly completed), and a second one that includes all remaining tasks. The first group of task does not need matrix fan outs (they have already been processed) or result resolution, so its state can be safely build incrementally. The second group is executed starting from the state of the second group. Any task that is a candidate for execution in this this reconcile cycle must have its results resolved through the state of the first group. Testing with the current code arrangement is a bit challenging, as we ignore result resolution errors in the code, which is ok only in some cases: - result resolution due to task not found or result not defined is permanent and should not be ignored - result resolution due to a result not being available yet is ephemeral (possibly) and should not cause a failure Currently one function checks for all these conditions and returns one error, so it's not possible to safely distinguish them. This will require some refactoring to be fixed in a follow up patch. For now, a reconcile unit test can test the fix. Fixes: #7103 Signed-off-by: Andrea Frittoli --- pkg/reconciler/pipelinerun/pipelinerun.go | 45 +++- .../pipelinerun/pipelinerun_test.go | 215 ++++++++++++++++++ 2 files changed, 257 insertions(+), 3 deletions(-) diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index eef81a2e16c..cea31b17fc8 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -325,10 +325,10 @@ func (c *Reconciler) resolvePipelineState( ctx context.Context, tasks []v1.PipelineTask, pipelineMeta *metav1.ObjectMeta, - pr *v1.PipelineRun) (resources.PipelineRunState, error) { + pr *v1.PipelineRun, + pst resources.PipelineRunState) (resources.PipelineRunState, error) { ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "resolvePipelineState") defer span.End() - pst := resources.PipelineRunState{} // Resolve each task individually because they each could have a different reference context (remote or local). for _, task := range tasks { // We need the TaskRun name to ensure that we don't perform an additional remote resolution request for a PipelineTask @@ -536,7 +536,46 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel if len(pipelineSpec.Finally) > 0 { tasks = append(tasks, pipelineSpec.Finally...) } - pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta.ObjectMeta, pr) + + // We spit tasks in two lists: + // - those with a completed (Task|Custom)Run reference (i.e. those that finished running) + // - those without a (Task|Custom)Run reference + // We resolve the status for the former first, to collect all results available at this stage + // We know that tasks in progress or completed have had their fan-out alteady calculated so + // they can be safely processed in the first iteration. The underlying assumption is that if + // a PipelineTask has at least one TaskRun associated, then all its TaskRuns have been + // created already. + // The second group takes as input the partial state built in the first iteration and finally + // the two results are collated + ranOrRunningTaskNames := sets.Set[string]{} + ranOrRunningTasks := []v1.PipelineTask{} + notStartedTasks := []v1.PipelineTask{} + + for _, child := range pr.Status.ChildReferences { + ranOrRunningTaskNames.Insert(child.PipelineTaskName) + } + for _, task := range tasks { + if ranOrRunningTaskNames.Has(task.Name) { + ranOrRunningTasks = append(ranOrRunningTasks, task) + } else { + notStartedTasks = append(notStartedTasks, task) + } + } + // First iteration + pst := resources.PipelineRunState{} + pipelineRunState, err := c.resolvePipelineState(ctx, ranOrRunningTasks, pipelineMeta.ObjectMeta, pr, pst) + switch { + case errors.Is(err, remote.ErrRequestInProgress): + message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name) + pr.Status.MarkRunning(v1.TaskRunReasonResolvingTaskRef, message) + return nil + case err != nil: + return err + default: + } + + // Second iteration + pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState) switch { case errors.Is(err, remote.ErrRequestInProgress): message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name) diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index ab35dc88702..f61d0cd0e5d 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -13756,6 +13756,221 @@ spec: } } +func TestReconcileWithResultsAndOutOfOrderTasks7103(t *testing.T) { + // TestReconcileWithResultsAndOutOfOrderTasks7103 runs "Reconcile" on a PipelineRun with three + // Tasks, defined in the following order: + // Task1 (t1): produces a result + // Task3 (t3): consumes results from t1 and t2 + // Task2 (t3): produces a result + // + // The execution flows is (t1, t2) and then t3. The status at the beginning of the test is + // That t1 and t2 have been executed successfully, t3 has not been started. + ps := []*v1.Pipeline{parse.MustParseV1Pipeline(t, ` +metadata: + name: 7103-reproducer + namespace: foo +spec: + tasks: + - name: task1 + taskSpec: + results: + - name: foo + type: string + steps: + - name: step1 + env: + - name: RESULT_PATH + value: $(results.foo.path) + image: ubuntu + script: | + #!/usr/bin/env bash + echo "foo-value" > "$RESULT_PATH" + - name: task3 + runAfter: + - task1 + - task2 + params: + - name: foo-value + value: $(tasks.task1.results.foo) + - name: bar-value + value: $(tasks.task2.results.bar) + taskSpec: + steps: + - name: step1 + env: + - name: FOO_VALUE + value: $(params.foo-value) + - name: BAR_VALUE + value: $(params.bar-value) + image: ubuntu + script: | + #!/usr/bin/env bash + echo "$FOO_VALUE" + echo "$BAR_VALUE" + - name: task2 + taskSpec: + results: + - name: bar + type: string + steps: + - name: step1 + env: + - name: RESULT_PATH + value: $(results.bar.path) + image: ubuntu + script: | + #!/usr/bin/env bash + echo "bar-value" > "$RESULT_PATH" +`)} + prs := []*v1.PipelineRun{parse.MustParseV1PipelineRun(t, ` +metadata: + name: 7103-reproducer-run-7jp4w + namespace: foo +spec: + pipelineRef: + name: 7103-reproducer +status: + conditions: + - lastTransitionTime: "2023-10-03T10:55:19Z" + message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0' + reason: Running + status: Unknown + type: Succeeded + startTime: "2023-10-03T10:55:12Z" + childReferences: + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task1 + pipelineTaskName: task1 + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task2 + pipelineTaskName: task2 +`)} + ts := []*v1.Task{simpleHelloWorldTask} + + trs := []*v1.TaskRun{ + mustParseTaskRunWithObjectMeta(t, + taskRunObjectMeta( + "7103-reproducer-run-7jp4w-task1", + "foo", "7103-reproducer-run-7jp4w", + "7103-reproducer", "task1", false), ` +status: + completionTime: "2023-10-03T10:55:19Z" + conditions: + - lastTransitionTime: "2023-10-03T10:55:19Z" + message: All Steps have completed executing + reason: Succeeded + status: "True" + type: Succeeded + podName: 7103-reproducer-run-7jp4w-task1-pod + results: + - name: foo + type: string + value: foo-value + startTime: "2023-10-03T10:55:12Z" +`), + mustParseTaskRunWithObjectMeta(t, + taskRunObjectMeta( + "7103-reproducer-run-7jp4w-task2", + "foo", "7103-reproducer-run-7jp4w", + "7103-reproducer", "task2", false), ` +status: + completionTime: "2023-10-03T10:55:18Z" + conditions: + - lastTransitionTime: "2023-10-03T10:55:18Z" + message: All Steps have completed executing + reason: Succeeded + status: "True" + type: Succeeded + podName: 7103-reproducer-run-7jp4w-task2-pod + results: + - name: bar + type: string + value: bar-value + startTime: "2023-10-03T10:55:12Z" +`)} + + d := test.Data{ + PipelineRuns: prs, + Pipelines: ps, + Tasks: ts, + TaskRuns: trs, + } + prt := newPipelineRunTest(t, d) + defer prt.Cancel() + + wantEvents := []string{} + reconciledRun, _ := prt.reconcileRun("foo", "7103-reproducer-run-7jp4w", wantEvents, false) + + expectedPipelineRun := parse.MustParseV1PipelineRun(t, ` +metadata: + name: 7103-reproducer-run-7jp4w + namespace: foo + labels: + tekton.dev/pipeline: "7103-reproducer" + annotations: {} +spec: + pipelineRef: + name: 7103-reproducer +status: + conditions: + - lastTransitionTime: "2023-10-03T10:55:19Z" + message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0' + reason: Running + status: Unknown + type: Succeeded + startTime: "2023-10-03T10:55:12Z" + childReferences: + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task1 + pipelineTaskName: task1 + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task2 + pipelineTaskName: task2 + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task3 + pipelineTaskName: task3 +`) + expectedPipelineRun.Status.PipelineSpec = &ps[0].Spec + + // The PipelineRun should include a task3 child + if d := cmp.Diff(expectedPipelineRun, reconciledRun, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" { + t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d)) + } + + // The TaskRun for task3 is not reconciled (no status), but it must created with + // resolved parameters + expectedTaskRun := mustParseTaskRunWithObjectMeta(t, + taskRunObjectMeta( + "7103-reproducer-run-7jp4w-task3", + "foo", "7103-reproducer-run-7jp4w", + "7103-reproducer", "task3", false), ` +spec: + serviceAccountName: "default" + params: + - name: foo-value + value: foo-value + - name: bar-value + value: bar-value +`) + // The taskSpec is already set by the reconcile to avoid an extra resolution + expectedTaskRun.Spec.TaskSpec = &ps[0].Spec.Tasks[1].TaskSpec.TaskSpec + taskRuns := getTaskRunsForPipelineRun(prt.TestAssets.Ctx, t, prt.TestAssets.Clients, "foo", "7103-reproducer-run-7jp4w") + // Ensure that there are 2 TaskRuns associated with this PipelineRun + validateTaskRunsCount(t, taskRuns, 3) + + // Check that the expected TaskRun was created + actual := getTaskRunByName(t, taskRuns, "7103-reproducer-run-7jp4w-task3") + // The TaskRun for task3 should include resolved results + if d := cmp.Diff(expectedTaskRun, actual, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" { + t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d)) + } +} + func getSignedV1Pipeline(unsigned *pipelinev1.Pipeline, signer signature.Signer, name string) (*pipelinev1.Pipeline, error) { signed := unsigned.DeepCopy() signed.Name = name