Skip to content

Commit

Permalink
[TEP-0145] Add CEL evaluation
Browse files Browse the repository at this point in the history
This commit adds CEL evaluation. Users are able to use CEL in
WhenExpression if the feature flag enable-cel-in-whenexpression is
enabled.If the evluation is false, the PipelineTask will be skipped.

Signed-off-by: Yongxuan Zhang [email protected]
  • Loading branch information
Yongxuanzhang committed Oct 21, 2023
1 parent 0111021 commit fbe6de3
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 2 deletions.
4 changes: 3 additions & 1 deletion pkg/apis/pipeline/v1/when_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (we *WhenExpression) isTrue() bool {

func (we *WhenExpression) applyReplacements(replacements map[string]string, arrayReplacements map[string][]string) WhenExpression {
replacedInput := substitution.ApplyReplacements(we.Input, replacements)
replacedCEL := substitution.ApplyReplacements(we.CEL, replacements)

var replacedValues []string
for _, val := range we.Values {
Expand All @@ -79,13 +80,14 @@ func (we *WhenExpression) applyReplacements(replacements map[string]string, arra
}
}

return WhenExpression{Input: replacedInput, Operator: we.Operator, Values: replacedValues}
return WhenExpression{Input: replacedInput, Operator: we.Operator, Values: replacedValues, CEL: replacedCEL}
}

// GetVarSubstitutionExpressions extracts all the values between "$(" and ")" in a When Expression
func (we *WhenExpression) GetVarSubstitutionExpressions() ([]string, bool) {
var allExpressions []string
allExpressions = append(allExpressions, validateString(we.Input)...)
allExpressions = append(allExpressions, validateString(we.CEL)...)
for _, value := range we.Values {
allExpressions = append(allExpressions, validateString(value)...)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/apis/pipeline/v1beta1/when_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (we *WhenExpression) isTrue() bool {

func (we *WhenExpression) applyReplacements(replacements map[string]string, arrayReplacements map[string][]string) WhenExpression {
replacedInput := substitution.ApplyReplacements(we.Input, replacements)
replacedCEL := substitution.ApplyReplacements(we.CEL, replacements)

var replacedValues []string
for _, val := range we.Values {
Expand All @@ -79,13 +80,14 @@ func (we *WhenExpression) applyReplacements(replacements map[string]string, arra
}
}

return WhenExpression{Input: replacedInput, Operator: we.Operator, Values: replacedValues}
return WhenExpression{Input: replacedInput, Operator: we.Operator, Values: replacedValues, CEL: replacedCEL}
}

// GetVarSubstitutionExpressions extracts all the values between "$(" and ")" in a When Expression
func (we *WhenExpression) GetVarSubstitutionExpressions() ([]string, bool) {
var allExpressions []string
allExpressions = append(allExpressions, validateString(we.Input)...)
allExpressions = append(allExpressions, validateString(we.CEL)...)
for _, value := range we.Values {
allExpressions = append(allExpressions, validateString(value)...)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,16 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
}
}

// Evaluate the CEL of PipelineTask after the variable substitutions and validations.
for _, rpt := range pipelineRunFacts.State {
err := rpt.EvaluateCEL()
if err != nil {
logger.Errorf("Error evaluating CEL %s: %v", pr.Name, err)
pr.Status.MarkFailed("invalid cel", err.Error())
return controller.NewPermanentError(err)
}
}

// check if pipeline run is not gracefully cancelled and there are active task runs, which require cancelling
if pr.IsGracefullyCancelled() && pipelineRunFacts.IsRunning() {
// If the pipelinerun is cancelled, cancel tasks, but run finally
Expand Down
153 changes: 153 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4023,6 +4023,159 @@ status:
}
}

func TestReconcileWithCELWhenExpressionsWithTaskResultsAndParams(t *testing.T) {
names.TestingSeed()
ps := []*v1.Pipeline{parse.MustParseV1Pipeline(t, `
metadata:
name: test-pipeline
namespace: foo
spec:
params:
- name: run
type: string
tasks:
- name: a-task
taskRef:
name: a-task
- name: b-task
taskRef:
name: b-task
when:
- cel: "'$(tasks.a-task.results.aResult)' == 'aResultValue'"
- name: c-task
taskRef:
name: c-task
when:
- cel: "'$(tasks.a-task.results.aResult)' == 'missing'"
- cel: "'$(params.run)'!='yes'"
- name: d-task
runAfter:
- c-task
taskRef:
name: d-task
`)}
prs := []*v1.PipelineRun{parse.MustParseV1PipelineRun(t, `
metadata:
name: test-pipeline-run-different-service-accs
namespace: foo
spec:
params:
- name: run
value: "yes"
pipelineRef:
name: test-pipeline
taskRunTemplate:
serviceAccountName: test-sa-0
`)}
ts := []*v1.Task{
{ObjectMeta: baseObjectMeta("a-task", "foo")},
{ObjectMeta: baseObjectMeta("b-task", "foo")},
{ObjectMeta: baseObjectMeta("c-task", "foo")},
{ObjectMeta: baseObjectMeta("d-task", "foo")},
}
trs := []*v1.TaskRun{mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta("test-pipeline-run-different-service-accs-a-task-xxyyy", "foo", "test-pipeline-run-different-service-accs",
"test-pipeline", "a-task", true),
`
spec:
serviceAccountName: test-sa
taskRef:
name: hello-world
timeout: 1h0m0s
status:
conditions:
- status: "True"
type: Succeeded
results:
- name: aResult
value: aResultValue
`)}
cms := []*corev1.ConfigMap{
{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
"enable-cel-in-whenexpression": "true",
},
},
}
d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
ConfigMaps: cms,
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

wantEvents := []string{
"Normal Started",
"Normal Running Tasks Completed: 1 \\(Failed: 0, Cancelled 0\\), Incomplete: 2, Skipped: 1",
}
pipelineRun, clients := prt.reconcileRun("foo", "test-pipeline-run-different-service-accs", wantEvents, false)

expectedTaskRunName := "test-pipeline-run-different-service-accs-b-task"
expectedTaskRun := mustParseTaskRunWithObjectMeta(t,
taskRunObjectMeta(expectedTaskRunName, "foo", "test-pipeline-run-different-service-accs", "test-pipeline", "b-task", false),
`
spec:
serviceAccountName: test-sa-0
taskRef:
name: b-task
kind: Task
`)
// Check that the expected TaskRun was created
actual, err := clients.Pipeline.TektonV1().TaskRuns("foo").List(prt.TestAssets.Ctx, metav1.ListOptions{
LabelSelector: "tekton.dev/pipelineTask=b-task,tekton.dev/pipelineRun=test-pipeline-run-different-service-accs",
Limit: 1,
})

if err != nil {
t.Fatalf("Failure to list TaskRun's %s", err)
}
if len(actual.Items) != 1 {
t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items))
}
actualTaskRun := actual.Items[0]
if d := cmp.Diff(expectedTaskRun, &actualTaskRun, ignoreResourceVersion, ignoreTypeMeta); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d))
}

expectedWhenExpressionsInTaskRun := []v1.WhenExpression{{
CEL: "'aResultValue' == 'aResultValue'",
}}
verifyTaskRunStatusesWhenExpressions(t, pipelineRun.Status, expectedTaskRunName, expectedWhenExpressionsInTaskRun)

actualSkippedTasks := pipelineRun.Status.SkippedTasks
expectedSkippedTasks := []v1.SkippedTask{{
Name: "c-task",
Reason: v1.WhenExpressionsSkip,
WhenExpressions: v1.WhenExpressions{{
CEL: "'aResultValue' == 'missing'",
}, {
CEL: "'yes'!='yes'",
}},
}}
if d := cmp.Diff(expectedSkippedTasks, actualSkippedTasks); d != "" {
t.Errorf("expected to find Skipped Tasks %v. Diff %s", expectedSkippedTasks, diff.PrintWantGot(d))
}

skippedTasks := []string{"c-task"}
for _, skippedTask := range skippedTasks {
labelSelector := fmt.Sprintf("tekton.dev/pipelineTask=%s,tekton.dev/pipelineRun=test-pipeline-run-different-service-accs", skippedTask)
actualSkippedTask, err := clients.Pipeline.TektonV1().TaskRuns("foo").List(prt.TestAssets.Ctx, metav1.ListOptions{
LabelSelector: labelSelector,
Limit: 1,
})
if err != nil {
t.Fatalf("Failure to list TaskRun's %s", err)
}
if len(actualSkippedTask.Items) != 0 {
t.Fatalf("Expected 0 TaskRuns got %d", len(actualSkippedTask.Items))
}
}
}

// TestReconcileWithAffinityAssistantStatefulSet tests that given a pipelineRun with workspaces,
// an Affinity Assistant StatefulSet is created for each PVC workspace and
// that the Affinity Assistant names is propagated to TaskRuns.
Expand Down
65 changes: 65 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"sort"

"github.com/google/cel-go/cel"
"github.com/google/cel-go/common/types"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
Expand Down Expand Up @@ -66,6 +68,51 @@ type ResolvedPipelineTask struct {
PipelineTask *v1.PipelineTask
ResolvedTask *resources.ResolvedTask
ResultsCache map[string][]string
// EvaluatedCEL is used to store the results of evaluated CEL expression
EvaluatedCEL map[string]bool
}

// isDone returns true only if the task is skipped, succeeded or failed
func (t *ResolvedPipelineTask) EvaluateCEL() error {
if t.PipelineTask != nil {
if len(t.EvaluatedCEL) == 0 {
t.EvaluatedCEL = make(map[string]bool)
}
for _, we := range t.PipelineTask.When {
if we.CEL == "" {
continue
}
_, ok := t.EvaluatedCEL[we.CEL]
if !ok {
// Create a program environment configured with the standard library of CEL functions and macros
env, err := cel.NewEnv(cel.Declarations())
if err != nil {
return err
}
// Parse and Check the CEL to get the Abstract Syntax Tree
ast, iss := env.Compile(we.CEL)
if iss.Err() != nil {
return iss.Err()
}
// Generate an evaluable instance of the Ast within the environment
prg, err := env.Program(ast)
if err != nil {
return err
}
// Evaluate the CEL expression
out, _, err := prg.Eval(map[string]interface{}{})
if err != nil {
return err
}
if out.ConvertToType(types.BoolType).Value() == true {
t.EvaluatedCEL[we.CEL] = true
} else {
t.EvaluatedCEL[we.CEL] = false
}
}
}
}
return nil
}

// isDone returns true only if the task is skipped, succeeded or failed
Expand Down Expand Up @@ -271,6 +318,8 @@ func (t *ResolvedPipelineTask) skip(facts *PipelineRunFacts) TaskSkipStatus {
skippingReason = v1.ParentTasksSkip
case t.skipBecauseResultReferencesAreMissing(facts):
skippingReason = v1.MissingResultsSkip
case t.skipBecauseCELExpressionsEvaluatedToFalse(facts):
skippingReason = v1.WhenExpressionsSkip
case t.skipBecauseWhenExpressionsEvaluatedToFalse(facts):
skippingReason = v1.WhenExpressionsSkip
case t.skipBecausePipelineRunPipelineTimeoutReached(facts):
Expand Down Expand Up @@ -305,6 +354,20 @@ func (t *ResolvedPipelineTask) Skip(facts *PipelineRunFacts) TaskSkipStatus {
return facts.SkipCache[t.PipelineTask.Name]
}

// skipBecauseCELExpressionsEvaluatedToFalse confirms that the CEL when expressions have completed evaluating, and
// it returns true if any of the CEL when expressions evaluate to false
func (t *ResolvedPipelineTask) skipBecauseCELExpressionsEvaluatedToFalse(facts *PipelineRunFacts) bool {
if t.checkParentsDone(facts) {
for _, we := range t.PipelineTask.When {
if we.CEL != "" && !t.EvaluatedCEL[we.CEL] {
return true
}
}
return false
}
return false
}

// skipBecauseWhenExpressionsEvaluatedToFalse confirms that the when expressions have completed evaluating, and
// it returns true if any of the when expressions evaluate to false
func (t *ResolvedPipelineTask) skipBecauseWhenExpressionsEvaluatedToFalse(facts *PipelineRunFacts) bool {
Expand Down Expand Up @@ -424,6 +487,8 @@ func (t *ResolvedPipelineTask) IsFinallySkipped(facts *PipelineRunFacts) TaskSki
switch {
case t.skipBecauseResultReferencesAreMissing(facts):
skippingReason = v1.MissingResultsSkip
case t.skipBecauseCELExpressionsEvaluatedToFalse(facts):
skippingReason = v1.WhenExpressionsSkip
case t.skipBecauseWhenExpressionsEvaluatedToFalse(facts):
skippingReason = v1.WhenExpressionsSkip
case t.skipBecausePipelineRunPipelineTimeoutReached(facts):
Expand Down
Loading

0 comments on commit fbe6de3

Please sign in to comment.