Skip to content

Commit

Permalink
Regression: fix results with out of order tasks
Browse files Browse the repository at this point in the history
The pipeline run reconciler builds a pipeline run state on every
run, which resolves task references, expands result and processes
matrix fan outs.

The current process is incremental in a single loop, where each
new PipelineTask resolution depends on the state of PipelineTasks
resolved before. This is problematic because tasks are not
necessarily defined in the pipeline in order of execution (which
is undefined, given that pipelines are DAGs).

Since this PR is a fix to a regression, it aims to be as minimal
as possible. The smallest solution available is to implement some
sorting in the list of tasks, so that the incremental state
can work correctly.

This PR splits the process into two runs, one for tasks that have
been already started (and possibly completed), and a second one
that includes all remaining tasks. The first group of task does
not need matrix fan outs (they have already been processed) or
result resolution, so its state can be safely build incrementally.

The second group is executed starting from the state of the second
group. Any task that is a candidate for execution in this this
reconcile cycle must have its results resolved through the state
of the first group.

Testing with the current code arrangement is a bit challenging,
as we ignore result resolution errors in the code, which is ok
only in some cases:
- result resolution due to task not found or result not defined
  is permanent and should not be ignored
- result resolution due to a result not being available yet is
  ephemeral (possibly) and should not cause a failure

Currently one function checks for all these conditions and
returns one error, so it's not possible to safely distinguish
them. This will require some refactoring to be fixed in a follow
up patch.

For now, a reconcile unit test can test the fix.

Fixes: #7103

Signed-off-by: Andrea Frittoli <[email protected]>
  • Loading branch information
afrittoli committed Oct 3, 2023
1 parent e8b85e2 commit dd2135a
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 3 deletions.
45 changes: 42 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ func (c *Reconciler) resolvePipelineState(
ctx context.Context,
tasks []v1.PipelineTask,
pipelineMeta *metav1.ObjectMeta,
pr *v1.PipelineRun) (resources.PipelineRunState, error) {
pr *v1.PipelineRun,
pst resources.PipelineRunState) (resources.PipelineRunState, error) {
ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "resolvePipelineState")
defer span.End()
pst := resources.PipelineRunState{}
// Resolve each task individually because they each could have a different reference context (remote or local).
for _, task := range tasks {
// We need the TaskRun name to ensure that we don't perform an additional remote resolution request for a PipelineTask
Expand Down Expand Up @@ -536,7 +536,46 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
if len(pipelineSpec.Finally) > 0 {
tasks = append(tasks, pipelineSpec.Finally...)
}
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta.ObjectMeta, pr)

// We spit tasks in two lists:
// - those with a completed (Task|Custom)Run reference (i.e. those that finished running)
// - those without a (Task|Custom)Run reference
// We resolve the status for the former first, to collect all results available at this stage
// We know that tasks in progress or completed have had their fan-out alteady calculated so
// they can be safely processed in the first iteration. The underlying assumption is that if
// a PipelineTask has at least one TaskRun associated, then all its TaskRuns have been
// created already.
// The second group takes as input the partial state built in the first iteration and finally
// the two results are collated
ranOrRunningTaskNames := sets.Set[string]{}
ranOrRunningTasks := []v1.PipelineTask{}
notStartedTasks := []v1.PipelineTask{}

for _, child := range pr.Status.ChildReferences {
ranOrRunningTaskNames.Insert(child.PipelineTaskName)
}
for _, task := range tasks {
if ranOrRunningTaskNames.Has(task.Name) {
ranOrRunningTasks = append(ranOrRunningTasks, task)
} else {
notStartedTasks = append(notStartedTasks, task)
}
}
// First iteration
pst := resources.PipelineRunState{}
pipelineRunState, err := c.resolvePipelineState(ctx, ranOrRunningTasks, pipelineMeta.ObjectMeta, pr, pst)
switch {
case errors.Is(err, remote.ErrRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
pr.Status.MarkRunning(v1.TaskRunReasonResolvingTaskRef, message)
return nil
case err != nil:
return err
default:
}

// Second iteration
pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState)
switch {
case errors.Is(err, remote.ErrRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
Expand Down
215 changes: 215 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13756,6 +13756,221 @@ spec:
}
}

func TestReconcileWithResultsAndOutOfOrderTasks7103(t *testing.T) {
// TestReconcileWithResultsAndOutOfOrderTasks7103 runs "Reconcile" on a PipelineRun with three
// Tasks, defined in the following order:
// Task1 (t1): produces a result
// Task3 (t3): consumes results from t1 and t2
// Task2 (t3): produces a result
//
// The execution flows is (t1, t2) and then t3. The status at the beginning of the test is
// That t1 and t2 have been executed successfully, t3 has not been started.
ps := []*v1.Pipeline{parse.MustParseV1Pipeline(t, `
metadata:
name: 7103-reproducer
namespace: foo
spec:
tasks:
- name: task1
taskSpec:
results:
- name: foo
type: string
steps:
- name: step1
env:
- name: RESULT_PATH
value: $(results.foo.path)
image: ubuntu
script: |
#!/usr/bin/env bash
echo "foo-value" > "$RESULT_PATH"
- name: task3
runAfter:
- task1
- task2
params:
- name: foo-value
value: $(tasks.task1.results.foo)
- name: bar-value
value: $(tasks.task2.results.bar)
taskSpec:
steps:
- name: step1
env:
- name: FOO_VALUE
value: $(params.foo-value)
- name: BAR_VALUE
value: $(params.bar-value)
image: ubuntu
script: |
#!/usr/bin/env bash
echo "$FOO_VALUE"
echo "$BAR_VALUE"
- name: task2
taskSpec:
results:
- name: bar
type: string
steps:
- name: step1
env:
- name: RESULT_PATH
value: $(results.bar.path)
image: ubuntu
script: |
#!/usr/bin/env bash
echo "bar-value" > "$RESULT_PATH"
`)}
prs := []*v1.PipelineRun{parse.MustParseV1PipelineRun(t, `
metadata:
name: 7103-reproducer-run-7jp4w
namespace: foo
spec:
pipelineRef:
name: 7103-reproducer
status:
conditions:
- lastTransitionTime: "2023-10-03T10:55:19Z"
message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0'
reason: Running
status: Unknown
type: Succeeded
startTime: "2023-10-03T10:55:12Z"
childReferences:
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task1
pipelineTaskName: task1
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task2
pipelineTaskName: task2
`)}
ts := []*v1.Task{simpleHelloWorldTask}

trs := []*v1.TaskRun{
mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta(
"7103-reproducer-run-7jp4w-task1",
"foo", "7103-reproducer-run-7jp4w",
"7103-reproducer", "task1", false), `
status:
completionTime: "2023-10-03T10:55:19Z"
conditions:
- lastTransitionTime: "2023-10-03T10:55:19Z"
message: All Steps have completed executing
reason: Succeeded
status: "True"
type: Succeeded
podName: 7103-reproducer-run-7jp4w-task1-pod
results:
- name: foo
type: string
value: foo-value
startTime: "2023-10-03T10:55:12Z"
`),
mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta(
"7103-reproducer-run-7jp4w-task2",
"foo", "7103-reproducer-run-7jp4w",
"7103-reproducer", "task2", false), `
status:
completionTime: "2023-10-03T10:55:18Z"
conditions:
- lastTransitionTime: "2023-10-03T10:55:18Z"
message: All Steps have completed executing
reason: Succeeded
status: "True"
type: Succeeded
podName: 7103-reproducer-run-7jp4w-task2-pod
results:
- name: bar
type: string
value: bar-value
startTime: "2023-10-03T10:55:12Z"
`)}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

wantEvents := []string{}
reconciledRun, _ := prt.reconcileRun("foo", "7103-reproducer-run-7jp4w", wantEvents, false)

expectedPipelineRun := parse.MustParseV1PipelineRun(t, `
metadata:
name: 7103-reproducer-run-7jp4w
namespace: foo
labels:
tekton.dev/pipeline: "7103-reproducer"
annotations: {}
spec:
pipelineRef:
name: 7103-reproducer
status:
conditions:
- lastTransitionTime: "2023-10-03T10:55:19Z"
message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0'
reason: Running
status: Unknown
type: Succeeded
startTime: "2023-10-03T10:55:12Z"
childReferences:
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task1
pipelineTaskName: task1
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task2
pipelineTaskName: task2
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task3
pipelineTaskName: task3
`)
expectedPipelineRun.Status.PipelineSpec = &ps[0].Spec

// The PipelineRun should include a task3 child
if d := cmp.Diff(expectedPipelineRun, reconciledRun, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" {
t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d))
}

// The TaskRun for task3 is not reconciled (no status), but it must created with
// resolved parameters
expectedTaskRun := mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta(
"7103-reproducer-run-7jp4w-task3",
"foo", "7103-reproducer-run-7jp4w",
"7103-reproducer", "task3", false), `
spec:
serviceAccountName: "default"
params:
- name: foo-value
value: foo-value
- name: bar-value
value: bar-value
`)
// The taskSpec is already set by the reconcile to avoid an extra resolution
expectedTaskRun.Spec.TaskSpec = &ps[0].Spec.Tasks[1].TaskSpec.TaskSpec
taskRuns := getTaskRunsForPipelineRun(prt.TestAssets.Ctx, t, prt.TestAssets.Clients, "foo", "7103-reproducer-run-7jp4w")
// Ensure that there are 2 TaskRuns associated with this PipelineRun
validateTaskRunsCount(t, taskRuns, 3)

// Check that the expected TaskRun was created
actual := getTaskRunByName(t, taskRuns, "7103-reproducer-run-7jp4w-task3")
// The TaskRun for task3 should include resolved results
if d := cmp.Diff(expectedTaskRun, actual, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" {
t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d))
}
}

func getSignedV1Pipeline(unsigned *pipelinev1.Pipeline, signer signature.Signer, name string) (*pipelinev1.Pipeline, error) {
signed := unsigned.DeepCopy()
signed.Name = name
Expand Down

0 comments on commit dd2135a

Please sign in to comment.