Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pipelinerun): resolve issue with PipelineRun not timing out successfully #8376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/apis/pipeline/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ limitations under the License.

package errors

import "errors"
import (
"errors"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
)

const UserErrorLabel = "[User error] "

Expand Down Expand Up @@ -71,3 +76,10 @@ func GetErrorMessage(err error) string {
}
return err.Error()
}

// IsImmutableTaskRunSpecError returns true if the error is the taskrun spec is immutable
func IsImmutableTaskRunSpecError(err error) bool {
// The TaskRun may have completed and the spec field is immutable.
// validation code: https://github.com/tektoncd/pipeline/blob/v0.62.0/pkg/apis/pipeline/v1/taskrun_validation.go#L136-L138
return apierrors.IsBadRequest(err) && strings.Contains(err.Error(), "no updates are allowed")
}
5 changes: 5 additions & 0 deletions pkg/reconciler/pipelinerun/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

pipelineErrors "github.com/tektoncd/pipeline/pkg/apis/pipeline/errors"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -88,6 +89,10 @@ func cancelTaskRun(ctx context.Context, taskRunName string, namespace string, cl
// still be able to cancel the PipelineRun
return nil
}
if pipelineErrors.IsImmutableTaskRunSpecError(err) {
// The TaskRun may have completed and the spec field is immutable, we should ignore this error.
return nil
}
return err
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/reconciler/pipelinerun/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

pipelineErrors "github.com/tektoncd/pipeline/pkg/apis/pipeline/errors"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -125,6 +126,10 @@ func timeoutPipelineTasksForTaskNames(ctx context.Context, logger *zap.SugaredLo
logger.Infof("patching TaskRun %s for timeout", taskRunName)

if err := timeoutTaskRun(ctx, taskRunName, pr.Namespace, clientSet); err != nil {
if pipelineErrors.IsImmutableTaskRunSpecError(err) {
// The TaskRun may have completed and the spec field is immutable, we should ignore this error.
continue
}
errs = append(errs, fmt.Errorf("failed to patch TaskRun `%s` with timeout: %w", taskRunName, err).Error())
continue
}
Expand Down
99 changes: 99 additions & 0 deletions test/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,102 @@ spec:
}
wg.Wait()
}

// TestPipelineRunTimeoutWithCompletedTaskRuns tests the case where a PipelineRun is timeout and has completed TaskRuns.
func TestPipelineRunTimeoutWithCompletedTaskRuns(t *testing.T) {
t.Parallel()
// cancel the context after we have waited a suitable buffer beyond the given deadline.
ctx, cancel := context.WithTimeout(context.Background(), timeout+2*time.Minute)
defer cancel()
c, namespace := setup(ctx, t)

knativetest.CleanupOnInterrupt(func() { tearDown(context.Background(), t, c, namespace) }, t.Logf)
defer tearDown(context.Background(), t, c, namespace)

t.Logf("Creating Task in namespace %s", namespace)
task := parse.MustParseV1Task(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
params:
- name: sleep
default: "1"
steps:
- image: mirror.gcr.io/busybox
command: ['/bin/sh']
args: ['-c', 'sleep $(params.sleep)']
`, helpers.ObjectNameForTest(t), namespace))
if _, err := c.V1TaskClient.Create(ctx, task, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task `%s`: %s", task.Name, err)
}

pipeline := parse.MustParseV1Pipeline(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
tasks:
- name: fast-task
params:
- name: sleep
value: "1"
taskRef:
name: %s
- name: slow-task
params:
- name: sleep
value: "120"
taskRef:
name: %s
`, helpers.ObjectNameForTest(t), namespace, task.Name, task.Name))
pipelineRun := parse.MustParseV1PipelineRun(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
pipelineRef:
name: %s
timeouts:
pipeline: 30s
tasks: 30s
`, helpers.ObjectNameForTest(t), namespace, pipeline.Name))
if _, err := c.V1PipelineClient.Create(ctx, pipeline, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline `%s`: %s", pipeline.Name, err)
}
if _, err := c.V1PipelineRunClient.Create(ctx, pipelineRun, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create PipelineRun `%s`: %s", pipelineRun.Name, err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to be timed out", pipelineRun.Name, namespace)
if err := WaitForPipelineRunState(ctx, c, pipelineRun.Name, timeout, FailedWithReason(v1.PipelineRunReasonTimedOut.String(), pipelineRun.Name), "PipelineRunTimedOut", v1Version); err != nil {
t.Errorf("Error waiting for PipelineRun %s to finish: %s", pipelineRun.Name, err)
}

taskrunList, err := c.V1TaskRunClient.List(ctx, metav1.ListOptions{LabelSelector: "tekton.dev/pipelineRun=" + pipelineRun.Name})
if err != nil {
t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", pipelineRun.Name, err)
}

t.Logf("Waiting for TaskRuns from PipelineRun %s in namespace %s to time out and be cancelled", pipelineRun.Name, namespace)
var wg sync.WaitGroup
for _, taskrunItem := range taskrunList.Items {
wg.Add(1)
go func(name string) {
defer wg.Done()
if strings.Contains(name, "fast-task") {
// fast-task should have completed, not timed out
return
}
err := WaitForTaskRunState(ctx, c, name, FailedWithReason(v1.TaskRunReasonCancelled.String(), name), v1.TaskRunReasonCancelled.String(), v1Version)
if err != nil {
t.Errorf("Error waiting for TaskRun %s to timeout: %s", name, err)
}
}(taskrunItem.Name)
}
wg.Wait()

if _, err := c.V1PipelineRunClient.Get(ctx, pipelineRun.Name, metav1.GetOptions{}); err != nil {
t.Fatalf("Failed to get PipelineRun `%s`: %s", pipelineRun.Name, err)
}
}