Skip to content

Commit

Permalink
feat: auto-cancel PipelineRuns on PR close
Browse files Browse the repository at this point in the history
The pipelinesascode.tekton.dev/cancel-in-progress: "true" feature
annotation has now been enhanced to include automatic cancellation of
PipelineRuns when the associated pull request is closed or merged.

Jira: https://issues.redhat.com/browse/SRVKP-6908

Signed-off-by: Chmouel Boudjnah <[email protected]>
  • Loading branch information
chmouel committed Dec 18, 2024
1 parent 5305656 commit 5de31a6
Show file tree
Hide file tree
Showing 28 changed files with 422 additions and 86 deletions.
4 changes: 4 additions & 0 deletions pkg/formatting/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package formatting
import (
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
)

// PipelineRunStatus return status of PR success failed or skipped.
func PipelineRunStatus(pr *tektonv1.PipelineRun) string {
if len(pr.Status.Conditions) == 0 {
return "neutral"
}
if pr.Status.GetCondition(apis.ConditionSucceeded).GetReason() == tektonv1.PipelineRunSpecStatusCancelled {
return "cancelled"
}
if pr.Status.Conditions[0].Status == corev1.ConditionFalse {
return "failure"
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/formatting/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"gotest.tools/v3/assert"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
knativeduckv1 "knative.dev/pkg/apis/duck/v1"
)

Expand All @@ -30,6 +31,23 @@ func TestPipelineRunStatus(t *testing.T) {
},
},
},
{
name: "cancelled",
pr: &tektonv1.PipelineRun{
Status: tektonv1.PipelineRunStatus{
Status: knativeduckv1.Status{
Conditions: knativeduckv1.Conditions{
{
Status: corev1.ConditionTrue,
Reason: tektonv1.PipelineRunSpecStatusCancelled,
Message: "Cancelled",
Type: apis.ConditionSucceeded,
},
},
},
},
},
},
{
name: "failure",
pr: &tektonv1.PipelineRun{
Expand Down
3 changes: 2 additions & 1 deletion pkg/params/triggertype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const (
OkToTest Trigger = "ok-to-test"
Retest Trigger = "retest"
Push Trigger = "push"
PullRequest Trigger = "pull_request"
PullRequest Trigger = "pull_request" // it's should be "pull_request_opened_updated" but let's keep it simple.
PullRequestClosed Trigger = "pull_request_closed"
Cancel Trigger = "cancel"
CheckSuiteRerequested Trigger = "check-suite-rerequested"
CheckRunRerequested Trigger = "check-run-rerequested"
Expand Down
105 changes: 63 additions & 42 deletions pkg/pipelineascode/cancel_pipelineruns.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,47 @@ import (
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/triggertype"
)

type matchingCond func(pr tektonv1.PipelineRun) bool

var cancelMergePatch = map[string]interface{}{
"spec": map[string]interface{}{
"status": tektonv1.PipelineRunSpecStatusCancelledRunFinally,
},
}

// cancelInProgress cancels all PipelineRuns associated with a given repository and pull request,
func (p *PacRun) cancelAllInProgressBelongingToPullRequest(ctx context.Context, repo *v1alpha1.Repository) error {
labelSelector := getLabelSelector(map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes(p.event.Repository),
keys.PullRequest: strconv.Itoa(int(p.event.PullRequestNumber)),
})
prs, err := p.run.Clients.Tekton.TektonV1().PipelineRuns(repo.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return fmt.Errorf("failed to list pipelineRuns : %w", err)
}

if len(prs.Items) == 0 {
msg := fmt.Sprintf("no pipelinerun found for repository: %v and pullRequest %v",
p.event.Repository, p.event.PullRequestNumber)
p.eventEmitter.EmitMessage(repo, zap.InfoLevel, "RepositoryPipelineRun", msg)
return nil
}

p.cancelPipelineRuns(ctx, prs, repo, func(_ tektonv1.PipelineRun) bool {
return true
})

return nil
}

// cancelInProgressMatchingPR cancels all PipelineRuns associated with a given repository and pull request,
// except for the one that triggered the cancellation. It first checks if the cancellation is in progress
// and if the repository has a concurrency limit. If a concurrency limit is set, it returns an error as
// cancellation is not supported with concurrency limits. It then retrieves the original pull request name
// from the annotations and lists all PipelineRuns with matching labels. For each PipelineRun that is not
// already done, cancelled, or gracefully stopped, it patches the PipelineRun to cancel it.
func (p *PacRun) cancelInProgress(ctx context.Context, matchPR *tektonv1.PipelineRun, repo *v1alpha1.Repository) error {
func (p *PacRun) cancelInProgressMatchingPR(ctx context.Context, matchPR *tektonv1.PipelineRun, repo *v1alpha1.Repository) error {
if matchPR == nil {
return nil
}
Expand Down Expand Up @@ -67,51 +95,28 @@ func (p *PacRun) cancelInProgress(ctx context.Context, matchPR *tektonv1.Pipelin
if err != nil {
return fmt.Errorf("failed to list pipelineRuns : %w", err)
}
var wg sync.WaitGroup
for _, pr := range prs.Items {
if pr.GetName() == matchPR.GetName() {
continue
}

p.cancelPipelineRuns(ctx, prs, repo, func(pr tektonv1.PipelineRun) bool {
// skip our own for cancellation
if sourceBranch, ok := pr.GetAnnotations()[keys.SourceBranch]; ok {
// NOTE(chmouel): Every PR has their own branch and so is every push to different branch
// it means we only cancel pipelinerun of the same name that runs to
// the unique branch. Note: HeadBranch is the branch from where the PR
// comes from in git jargon.
if sourceBranch != p.event.HeadBranch {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is not from the same branch, annotation source-branch: %s event headbranch: %s", pr.GetNamespace(), pr.GetName(), sourceBranch, p.event.HeadBranch)
continue
return false
}
}

if pr.IsPending() {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is pending", pr.GetNamespace(), pr.GetName())
}

if pr.IsDone() {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is done", pr.GetNamespace(), pr.GetName())
continue
}
if pr.IsCancelled() || pr.IsGracefullyCancelled() || pr.IsGracefullyStopped() {
p.logger.Infof("cancel-in-progress: skipping pipelinerun %v/%v as it is already in %v state", pr.GetNamespace(), pr.GetName(), pr.Spec.Status)
continue
}

p.logger.Infof("cancel-in-progress: cancelling pipelinerun %v/%v", pr.GetNamespace(), pr.GetName())
wg.Add(1)
go func(ctx context.Context, pr tektonv1.PipelineRun) {
defer wg.Done()
if _, err := action.PatchPipelineRun(ctx, p.logger, "cancel patch", p.run.Clients.Tekton, &pr, cancelMergePatch); err != nil {
errMsg := fmt.Sprintf("failed to cancel pipelineRun %s/%s: %s", pr.GetNamespace(), pr.GetName(), err.Error())
p.eventEmitter.EmitMessage(repo, zap.ErrorLevel, "RepositoryPipelineRun", errMsg)
}
}(ctx, pr)
}
wg.Wait()

return pr.GetName() != matchPR.GetName()
})
return nil
}

func (p *PacRun) cancelPipelineRuns(ctx context.Context, repo *v1alpha1.Repository) error {
// cancelPipelineRunsOpsComment cancels all PipelineRuns associated with a given repository and pull request.
// when the user issue a cancel comment.
func (p *PacRun) cancelPipelineRunsOpsComment(ctx context.Context, repo *v1alpha1.Repository) error {
labelSelector := getLabelSelector(map[string]string{
keys.URLRepository: formatting.CleanValueKubernetes(p.event.Repository),
keys.SHA: formatting.CleanValueKubernetes(p.event.SHA),
Expand All @@ -137,22 +142,40 @@ func (p *PacRun) cancelPipelineRuns(ctx context.Context, repo *v1alpha1.Reposito
return nil
}

var wg sync.WaitGroup
for _, pr := range prs.Items {
p.cancelPipelineRuns(ctx, prs, repo, func(pr tektonv1.PipelineRun) bool {
if p.event.TargetCancelPipelineRun != "" {
if prName, ok := pr.GetAnnotations()[keys.OriginalPRName]; !ok || prName != p.event.TargetCancelPipelineRun {
continue
return false
}
}
if pr.IsDone() {
p.logger.Infof("pipelinerun %v/%v is done, skipping cancellation", pr.GetNamespace(), pr.GetName())
return true
})

return nil
}

func (p *PacRun) cancelPipelineRuns(ctx context.Context, prs *tektonv1.PipelineRunList, repo *v1alpha1.Repository, condition matchingCond) {
var wg sync.WaitGroup
for _, pr := range prs.Items {
if !condition(pr) {
continue
}

if pr.IsCancelled() || pr.IsGracefullyCancelled() || pr.IsGracefullyStopped() {
p.logger.Infof("pipelinerun %v/%v is already in %v state", pr.GetNamespace(), pr.GetName(), pr.Spec.Status)
p.logger.Infof("cancel-in-progress: skipping cancelling pipelinerun %v/%v, already in %v state", pr.GetNamespace(), pr.GetName(), pr.Spec.Status)
continue
}

if pr.IsDone() {
p.logger.Infof("cancel-in-progress: skipping cancelling pipelinerun %v/%v, already done", pr.GetNamespace(), pr.GetName())
continue
}

if pr.IsPending() {
p.logger.Infof("cancel-in-progress: skipping cancelling pipelinerun %v/%v in pending state", pr.GetNamespace(), pr.GetName())
}

p.logger.Infof("cancel-in-progress: cancelling pipelinerun %v/%v", pr.GetNamespace(), pr.GetName())
wg.Add(1)
go func(ctx context.Context, pr tektonv1.PipelineRun) {
defer wg.Done()
Expand All @@ -163,8 +186,6 @@ func (p *PacRun) cancelPipelineRuns(ctx context.Context, repo *v1alpha1.Reposito
}(ctx, pr)
}
wg.Wait()

return nil
}

func getLabelSelector(labelsMap map[string]string) string {
Expand Down
96 changes: 92 additions & 4 deletions pkg/pipelineascode/cancel_pipelineruns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
}
)

func TestCancelPipelinerun(t *testing.T) {
func TestCancelPipelinerunOpsComment(t *testing.T) {
observer, _ := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()
tests := []struct {
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestCancelPipelinerun(t *testing.T) {
},
}
pac := NewPacs(tt.event, nil, cs, &info.PacOpts{}, nil, logger, nil)
err := pac.cancelPipelineRuns(ctx, tt.repo)
err := pac.cancelPipelineRunsOpsComment(ctx, tt.repo)
assert.NilError(t, err)

got, err := cs.Clients.Tekton.TektonV1().PipelineRuns("foo").List(ctx, metav1.ListOptions{})
Expand All @@ -318,7 +318,7 @@ func TestCancelPipelinerun(t *testing.T) {
}
}

func TestCancelInProgress(t *testing.T) {
func TestCancelInProgressMatchingPR(t *testing.T) {
observer, catcher := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()
tests := []struct {
Expand Down Expand Up @@ -789,7 +789,7 @@ func TestCancelInProgress(t *testing.T) {
if len(tt.pipelineRuns) > 0 {
firstPr = tt.pipelineRuns[0]
}
err := pac.cancelInProgress(ctx, firstPr, tt.repo)
err := pac.cancelInProgressMatchingPR(ctx, firstPr, tt.repo)
if tt.wantErrString != "" {
assert.ErrorContains(t, err, tt.wantErrString)
return
Expand Down Expand Up @@ -818,6 +818,94 @@ func TestCancelInProgress(t *testing.T) {
}
}

func TestCancelAllInProgressBelongingToPullRequest(t *testing.T) {
observer, _ := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()

tests := []struct {
name string
event *info.Event
repo *v1alpha1.Repository
pipelineRuns []*pipelinev1.PipelineRun
cancelledPipelineRuns map[string]bool
}{
{
name: "cancel all in progress PipelineRuns",
event: &info.Event{
Repository: "foo",
TriggerTarget: "pull_request",
PullRequestNumber: pullReqNumber,
},
repo: fooRepo,
pipelineRuns: []*pipelinev1.PipelineRun{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr-foo-1",
Namespace: "foo",
Labels: fooRepoLabels,
},
Spec: pipelinev1.PipelineRunSpec{},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pr-foo-2",
Namespace: "foo",
Labels: fooRepoLabels,
},
Spec: pipelinev1.PipelineRunSpec{},
},
},
cancelledPipelineRuns: map[string]bool{
"pr-foo-1": true,
"pr-foo-2": true,
},
},
{
name: "no PipelineRuns to cancel",
event: &info.Event{
Repository: "foo",
TriggerTarget: "pull_request",
PullRequestNumber: pullReqNumber,
},
repo: fooRepo,
pipelineRuns: []*pipelinev1.PipelineRun{},
cancelledPipelineRuns: map[string]bool{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, _ := rtesting.SetupFakeContext(t)

tdata := testclient.Data{
PipelineRuns: tt.pipelineRuns,
}
stdata, _ := testclient.SeedTestData(t, ctx, tdata)
cs := &params.Run{
Clients: clients.Clients{
Log: logger,
Tekton: stdata.Pipeline,
Kube: stdata.Kube,
},
}
pac := NewPacs(tt.event, nil, cs, &info.PacOpts{}, nil, logger, nil)
err := pac.cancelAllInProgressBelongingToPullRequest(ctx, tt.repo)
assert.NilError(t, err)

got, err := cs.Clients.Tekton.TektonV1().PipelineRuns("foo").List(ctx, metav1.ListOptions{})
assert.NilError(t, err)

for _, pr := range got.Items {
if _, ok := tt.cancelledPipelineRuns[pr.Name]; ok {
assert.Equal(t, string(pr.Spec.Status), pipelinev1.PipelineRunSpecStatusCancelledRunFinally)
} else {
assert.Assert(t, string(pr.Spec.Status) != pipelinev1.PipelineRunSpecStatusCancelledRunFinally)
}
}
})
}
}

func TestGetLabelSelector(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipelineascode/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (p *PacRun) matchRepoPR(ctx context.Context) ([]matcher.Match, *v1alpha1.Re
}

if p.event.CancelPipelineRuns {
return nil, repo, p.cancelPipelineRuns(ctx, repo)
return nil, repo, p.cancelPipelineRunsOpsComment(ctx, repo)
}

matchedPRs, err := p.getPipelineRunsFromRepo(ctx, repo)
Expand Down
9 changes: 8 additions & 1 deletion pkg/pipelineascode/pipelineascode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/clients"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/info"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/settings"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/triggertype"
"github.com/openshift-pipelines/pipelines-as-code/pkg/provider"
"github.com/openshift-pipelines/pipelines-as-code/pkg/secrets"
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
Expand Down Expand Up @@ -55,6 +56,12 @@ func NewPacs(event *info.Event, vcx provider.Interface, run *params.Run, pacInfo

func (p *PacRun) Run(ctx context.Context) error {
matchedPRs, repo, err := p.matchRepoPR(ctx)
if repo != nil && p.event.TriggerTarget == triggertype.PullRequestClosed {
if err := p.cancelAllInProgressBelongingToPullRequest(ctx, repo); err != nil {
return fmt.Errorf("error cancelling in progress pipelineRuns belonging to pull request %d: %w", p.event.PullRequestNumber, err)
}
return nil
}
if err != nil {
createStatusErr := p.vcx.CreateStatus(ctx, p.event, provider.StatusOpts{
Status: CompletedStatus,
Expand Down Expand Up @@ -116,7 +123,7 @@ func (p *PacRun) Run(ctx context.Context) error {
}
}
p.manager.AddPipelineRun(pr)
if err := p.cancelInProgress(ctx, pr, repo); err != nil {
if err := p.cancelInProgressMatchingPR(ctx, pr, repo); err != nil {
p.eventEmitter.EmitMessage(repo, zap.ErrorLevel, "RepositoryPipelineRun", fmt.Sprintf("error cancelling in progress pipelineRuns: %s", err))
}
}(match, i)
Expand Down
Loading

0 comments on commit 5de31a6

Please sign in to comment.