diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index eef81a2e16c..cea31b17fc8 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -325,10 +325,10 @@ func (c *Reconciler) resolvePipelineState( ctx context.Context, tasks []v1.PipelineTask, pipelineMeta *metav1.ObjectMeta, - pr *v1.PipelineRun) (resources.PipelineRunState, error) { + pr *v1.PipelineRun, + pst resources.PipelineRunState) (resources.PipelineRunState, error) { ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "resolvePipelineState") defer span.End() - pst := resources.PipelineRunState{} // Resolve each task individually because they each could have a different reference context (remote or local). for _, task := range tasks { // We need the TaskRun name to ensure that we don't perform an additional remote resolution request for a PipelineTask @@ -536,7 +536,46 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel if len(pipelineSpec.Finally) > 0 { tasks = append(tasks, pipelineSpec.Finally...) } - pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta.ObjectMeta, pr) + + // We spit tasks in two lists: + // - those with a completed (Task|Custom)Run reference (i.e. those that finished running) + // - those without a (Task|Custom)Run reference + // We resolve the status for the former first, to collect all results available at this stage + // We know that tasks in progress or completed have had their fan-out alteady calculated so + // they can be safely processed in the first iteration. The underlying assumption is that if + // a PipelineTask has at least one TaskRun associated, then all its TaskRuns have been + // created already. + // The second group takes as input the partial state built in the first iteration and finally + // the two results are collated + ranOrRunningTaskNames := sets.Set[string]{} + ranOrRunningTasks := []v1.PipelineTask{} + notStartedTasks := []v1.PipelineTask{} + + for _, child := range pr.Status.ChildReferences { + ranOrRunningTaskNames.Insert(child.PipelineTaskName) + } + for _, task := range tasks { + if ranOrRunningTaskNames.Has(task.Name) { + ranOrRunningTasks = append(ranOrRunningTasks, task) + } else { + notStartedTasks = append(notStartedTasks, task) + } + } + // First iteration + pst := resources.PipelineRunState{} + pipelineRunState, err := c.resolvePipelineState(ctx, ranOrRunningTasks, pipelineMeta.ObjectMeta, pr, pst) + switch { + case errors.Is(err, remote.ErrRequestInProgress): + message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name) + pr.Status.MarkRunning(v1.TaskRunReasonResolvingTaskRef, message) + return nil + case err != nil: + return err + default: + } + + // Second iteration + pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState) switch { case errors.Is(err, remote.ErrRequestInProgress): message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name) diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index ab35dc88702..f61d0cd0e5d 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -13756,6 +13756,221 @@ spec: } } +func TestReconcileWithResultsAndOutOfOrderTasks7103(t *testing.T) { + // TestReconcileWithResultsAndOutOfOrderTasks7103 runs "Reconcile" on a PipelineRun with three + // Tasks, defined in the following order: + // Task1 (t1): produces a result + // Task3 (t3): consumes results from t1 and t2 + // Task2 (t3): produces a result + // + // The execution flows is (t1, t2) and then t3. The status at the beginning of the test is + // That t1 and t2 have been executed successfully, t3 has not been started. + ps := []*v1.Pipeline{parse.MustParseV1Pipeline(t, ` +metadata: + name: 7103-reproducer + namespace: foo +spec: + tasks: + - name: task1 + taskSpec: + results: + - name: foo + type: string + steps: + - name: step1 + env: + - name: RESULT_PATH + value: $(results.foo.path) + image: ubuntu + script: | + #!/usr/bin/env bash + echo "foo-value" > "$RESULT_PATH" + - name: task3 + runAfter: + - task1 + - task2 + params: + - name: foo-value + value: $(tasks.task1.results.foo) + - name: bar-value + value: $(tasks.task2.results.bar) + taskSpec: + steps: + - name: step1 + env: + - name: FOO_VALUE + value: $(params.foo-value) + - name: BAR_VALUE + value: $(params.bar-value) + image: ubuntu + script: | + #!/usr/bin/env bash + echo "$FOO_VALUE" + echo "$BAR_VALUE" + - name: task2 + taskSpec: + results: + - name: bar + type: string + steps: + - name: step1 + env: + - name: RESULT_PATH + value: $(results.bar.path) + image: ubuntu + script: | + #!/usr/bin/env bash + echo "bar-value" > "$RESULT_PATH" +`)} + prs := []*v1.PipelineRun{parse.MustParseV1PipelineRun(t, ` +metadata: + name: 7103-reproducer-run-7jp4w + namespace: foo +spec: + pipelineRef: + name: 7103-reproducer +status: + conditions: + - lastTransitionTime: "2023-10-03T10:55:19Z" + message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0' + reason: Running + status: Unknown + type: Succeeded + startTime: "2023-10-03T10:55:12Z" + childReferences: + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task1 + pipelineTaskName: task1 + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task2 + pipelineTaskName: task2 +`)} + ts := []*v1.Task{simpleHelloWorldTask} + + trs := []*v1.TaskRun{ + mustParseTaskRunWithObjectMeta(t, + taskRunObjectMeta( + "7103-reproducer-run-7jp4w-task1", + "foo", "7103-reproducer-run-7jp4w", + "7103-reproducer", "task1", false), ` +status: + completionTime: "2023-10-03T10:55:19Z" + conditions: + - lastTransitionTime: "2023-10-03T10:55:19Z" + message: All Steps have completed executing + reason: Succeeded + status: "True" + type: Succeeded + podName: 7103-reproducer-run-7jp4w-task1-pod + results: + - name: foo + type: string + value: foo-value + startTime: "2023-10-03T10:55:12Z" +`), + mustParseTaskRunWithObjectMeta(t, + taskRunObjectMeta( + "7103-reproducer-run-7jp4w-task2", + "foo", "7103-reproducer-run-7jp4w", + "7103-reproducer", "task2", false), ` +status: + completionTime: "2023-10-03T10:55:18Z" + conditions: + - lastTransitionTime: "2023-10-03T10:55:18Z" + message: All Steps have completed executing + reason: Succeeded + status: "True" + type: Succeeded + podName: 7103-reproducer-run-7jp4w-task2-pod + results: + - name: bar + type: string + value: bar-value + startTime: "2023-10-03T10:55:12Z" +`)} + + d := test.Data{ + PipelineRuns: prs, + Pipelines: ps, + Tasks: ts, + TaskRuns: trs, + } + prt := newPipelineRunTest(t, d) + defer prt.Cancel() + + wantEvents := []string{} + reconciledRun, _ := prt.reconcileRun("foo", "7103-reproducer-run-7jp4w", wantEvents, false) + + expectedPipelineRun := parse.MustParseV1PipelineRun(t, ` +metadata: + name: 7103-reproducer-run-7jp4w + namespace: foo + labels: + tekton.dev/pipeline: "7103-reproducer" + annotations: {} +spec: + pipelineRef: + name: 7103-reproducer +status: + conditions: + - lastTransitionTime: "2023-10-03T10:55:19Z" + message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0' + reason: Running + status: Unknown + type: Succeeded + startTime: "2023-10-03T10:55:12Z" + childReferences: + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task1 + pipelineTaskName: task1 + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task2 + pipelineTaskName: task2 + - apiVersion: tekton.dev/v1 + kind: TaskRun + name: 7103-reproducer-run-7jp4w-task3 + pipelineTaskName: task3 +`) + expectedPipelineRun.Status.PipelineSpec = &ps[0].Spec + + // The PipelineRun should include a task3 child + if d := cmp.Diff(expectedPipelineRun, reconciledRun, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" { + t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d)) + } + + // The TaskRun for task3 is not reconciled (no status), but it must created with + // resolved parameters + expectedTaskRun := mustParseTaskRunWithObjectMeta(t, + taskRunObjectMeta( + "7103-reproducer-run-7jp4w-task3", + "foo", "7103-reproducer-run-7jp4w", + "7103-reproducer", "task3", false), ` +spec: + serviceAccountName: "default" + params: + - name: foo-value + value: foo-value + - name: bar-value + value: bar-value +`) + // The taskSpec is already set by the reconcile to avoid an extra resolution + expectedTaskRun.Spec.TaskSpec = &ps[0].Spec.Tasks[1].TaskSpec.TaskSpec + taskRuns := getTaskRunsForPipelineRun(prt.TestAssets.Ctx, t, prt.TestAssets.Clients, "foo", "7103-reproducer-run-7jp4w") + // Ensure that there are 2 TaskRuns associated with this PipelineRun + validateTaskRunsCount(t, taskRuns, 3) + + // Check that the expected TaskRun was created + actual := getTaskRunByName(t, taskRuns, "7103-reproducer-run-7jp4w-task3") + // The TaskRun for task3 should include resolved results + if d := cmp.Diff(expectedTaskRun, actual, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" { + t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d)) + } +} + func getSignedV1Pipeline(unsigned *pipelinev1.Pipeline, signer signature.Signer, name string) (*pipelinev1.Pipeline, error) { signed := unsigned.DeepCopy() signed.Name = name