Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression: fix results with out of order tasks #7169

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ func (c *Reconciler) resolvePipelineState(
ctx context.Context,
tasks []v1.PipelineTask,
pipelineMeta *metav1.ObjectMeta,
pr *v1.PipelineRun) (resources.PipelineRunState, error) {
pr *v1.PipelineRun,
pst resources.PipelineRunState) (resources.PipelineRunState, error) {
ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "resolvePipelineState")
defer span.End()
pst := resources.PipelineRunState{}
// Resolve each task individually because they each could have a different reference context (remote or local).
for _, task := range tasks {
// We need the TaskRun name to ensure that we don't perform an additional remote resolution request for a PipelineTask
Expand Down Expand Up @@ -536,7 +536,46 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
if len(pipelineSpec.Finally) > 0 {
tasks = append(tasks, pipelineSpec.Finally...)
}
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta.ObjectMeta, pr)

// We spit tasks in two lists:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: split

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice, thanks, I will fix in a follow up

// - those with a completed (Task|Custom)Run reference (i.e. those that finished running)
// - those without a (Task|Custom)Run reference
// We resolve the status for the former first, to collect all results available at this stage
// We know that tasks in progress or completed have had their fan-out alteady calculated so
// they can be safely processed in the first iteration. The underlying assumption is that if
// a PipelineTask has at least one TaskRun associated, then all its TaskRuns have been
// created already.
// The second group takes as input the partial state built in the first iteration and finally
// the two results are collated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: collected

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually meant collated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry I misunderstood here. 😄

ranOrRunningTaskNames := sets.Set[string]{}
ranOrRunningTasks := []v1.PipelineTask{}
notStartedTasks := []v1.PipelineTask{}

for _, child := range pr.Status.ChildReferences {
ranOrRunningTaskNames.Insert(child.PipelineTaskName)
}
for _, task := range tasks {
if ranOrRunningTaskNames.Has(task.Name) {
ranOrRunningTasks = append(ranOrRunningTasks, task)
} else {
notStartedTasks = append(notStartedTasks, task)
}
}
// First iteration
pst := resources.PipelineRunState{}
pipelineRunState, err := c.resolvePipelineState(ctx, ranOrRunningTasks, pipelineMeta.ObjectMeta, pr, pst)
switch {
case errors.Is(err, remote.ErrRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
pr.Status.MarkRunning(v1.TaskRunReasonResolvingTaskRef, message)
return nil
case err != nil:
return err
default:
}

// Second iteration
pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState)
switch {
case errors.Is(err, remote.ErrRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
Expand Down
215 changes: 215 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13756,6 +13756,221 @@ spec:
}
}

func TestReconcileWithResultsAndOutOfOrderTasks7103(t *testing.T) {
// TestReconcileWithResultsAndOutOfOrderTasks7103 runs "Reconcile" on a PipelineRun with three
// Tasks, defined in the following order:
// Task1 (t1): produces a result
// Task3 (t3): consumes results from t1 and t2
// Task2 (t3): produces a result
//
// The execution flows is (t1, t2) and then t3. The status at the beginning of the test is
// That t1 and t2 have been executed successfully, t3 has not been started.
ps := []*v1.Pipeline{parse.MustParseV1Pipeline(t, `
metadata:
name: 7103-reproducer
namespace: foo
spec:
tasks:
- name: task1
taskSpec:
results:
- name: foo
type: string
steps:
- name: step1
env:
- name: RESULT_PATH
value: $(results.foo.path)
image: ubuntu
script: |
#!/usr/bin/env bash
echo "foo-value" > "$RESULT_PATH"
- name: task3
runAfter:
- task1
- task2
params:
- name: foo-value
value: $(tasks.task1.results.foo)
- name: bar-value
value: $(tasks.task2.results.bar)
taskSpec:
steps:
- name: step1
env:
- name: FOO_VALUE
value: $(params.foo-value)
- name: BAR_VALUE
value: $(params.bar-value)
image: ubuntu
script: |
#!/usr/bin/env bash
echo "$FOO_VALUE"
echo "$BAR_VALUE"
- name: task2
taskSpec:
results:
- name: bar
type: string
steps:
- name: step1
env:
- name: RESULT_PATH
value: $(results.bar.path)
image: ubuntu
script: |
#!/usr/bin/env bash
echo "bar-value" > "$RESULT_PATH"
`)}
prs := []*v1.PipelineRun{parse.MustParseV1PipelineRun(t, `
metadata:
name: 7103-reproducer-run-7jp4w
namespace: foo
spec:
pipelineRef:
name: 7103-reproducer
status:
conditions:
- lastTransitionTime: "2023-10-03T10:55:19Z"
message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0'
reason: Running
status: Unknown
type: Succeeded
startTime: "2023-10-03T10:55:12Z"
childReferences:
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task1
pipelineTaskName: task1
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task2
pipelineTaskName: task2
`)}
ts := []*v1.Task{simpleHelloWorldTask}

trs := []*v1.TaskRun{
mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta(
"7103-reproducer-run-7jp4w-task1",
"foo", "7103-reproducer-run-7jp4w",
"7103-reproducer", "task1", false), `
status:
completionTime: "2023-10-03T10:55:19Z"
conditions:
- lastTransitionTime: "2023-10-03T10:55:19Z"
message: All Steps have completed executing
reason: Succeeded
status: "True"
type: Succeeded
podName: 7103-reproducer-run-7jp4w-task1-pod
results:
- name: foo
type: string
value: foo-value
startTime: "2023-10-03T10:55:12Z"
`),
mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta(
"7103-reproducer-run-7jp4w-task2",
"foo", "7103-reproducer-run-7jp4w",
"7103-reproducer", "task2", false), `
status:
completionTime: "2023-10-03T10:55:18Z"
conditions:
- lastTransitionTime: "2023-10-03T10:55:18Z"
message: All Steps have completed executing
reason: Succeeded
status: "True"
type: Succeeded
podName: 7103-reproducer-run-7jp4w-task2-pod
results:
- name: bar
type: string
value: bar-value
startTime: "2023-10-03T10:55:12Z"
`)}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

wantEvents := []string{}
reconciledRun, _ := prt.reconcileRun("foo", "7103-reproducer-run-7jp4w", wantEvents, false)

expectedPipelineRun := parse.MustParseV1PipelineRun(t, `
metadata:
name: 7103-reproducer-run-7jp4w
namespace: foo
labels:
tekton.dev/pipeline: "7103-reproducer"
annotations: {}
spec:
pipelineRef:
name: 7103-reproducer
status:
conditions:
- lastTransitionTime: "2023-10-03T10:55:19Z"
message: 'Tasks Completed: 2 (Failed: 0, Cancelled 0), Incomplete: 1, Skipped: 0'
reason: Running
status: Unknown
type: Succeeded
startTime: "2023-10-03T10:55:12Z"
childReferences:
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task1
pipelineTaskName: task1
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task2
pipelineTaskName: task2
- apiVersion: tekton.dev/v1
kind: TaskRun
name: 7103-reproducer-run-7jp4w-task3
pipelineTaskName: task3
`)
expectedPipelineRun.Status.PipelineSpec = &ps[0].Spec

// The PipelineRun should include a task3 child
if d := cmp.Diff(expectedPipelineRun, reconciledRun, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" {
t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d))
}

// The TaskRun for task3 is not reconciled (no status), but it must created with
// resolved parameters
expectedTaskRun := mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta(
"7103-reproducer-run-7jp4w-task3",
"foo", "7103-reproducer-run-7jp4w",
"7103-reproducer", "task3", false), `
spec:
serviceAccountName: "default"
params:
- name: foo-value
value: foo-value
- name: bar-value
value: bar-value
`)
// The taskSpec is already set by the reconcile to avoid an extra resolution
expectedTaskRun.Spec.TaskSpec = &ps[0].Spec.Tasks[1].TaskSpec.TaskSpec
taskRuns := getTaskRunsForPipelineRun(prt.TestAssets.Ctx, t, prt.TestAssets.Clients, "foo", "7103-reproducer-run-7jp4w")
// Ensure that there are 2 TaskRuns associated with this PipelineRun
validateTaskRunsCount(t, taskRuns, 3)

// Check that the expected TaskRun was created
actual := getTaskRunByName(t, taskRuns, "7103-reproducer-run-7jp4w-task3")
// The TaskRun for task3 should include resolved results
if d := cmp.Diff(expectedTaskRun, actual, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta, ignoreProvenance, ignoreStartTime); d != "" {
t.Errorf("Expected to see PipelineRun run with a task3 child reference %s", diff.PrintWantGot(d))
}
}

func getSignedV1Pipeline(unsigned *pipelinev1.Pipeline, signer signature.Signer, name string) (*pipelinev1.Pipeline, error) {
signed := unsigned.DeepCopy()
signed.Name = name
Expand Down
Loading