Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

feat: Replace polling for jobs with watch #294

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chart/templates/serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rules:
verbs:
- "list"
- "get"
- "watch"
---
# Bind role for accessing secrets onto the job-executor-service service account
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
6 changes: 2 additions & 4 deletions pkg/eventhandler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ type K8s interface {
jobName string, jobDetails k8sutils.JobDetails, eventData keptn.EventProperties,
jobSettings k8sutils.JobSettings, jsonEventData interface{}, namespace string,
) error
AwaitK8sJobDone(
jobName string, maxPollDuration time.Duration, pollIntervalInSeconds time.Duration, namespace string,
) error
AwaitK8sJobDone(jobName string, maxPollDuration time.Duration, namespace string) error
GetFailedEventsForJob(jobName string, namespace string) (string, error)
GetLogsOfPod(jobName string, namespace string) (string, error)
}
Expand Down Expand Up @@ -217,7 +215,7 @@ func (eh *EventHandler) startK8sJob(action *config.Action, actionIndex int, conf
if task.MaxPollDuration != nil {
maxPollDuration = time.Duration(*task.MaxPollDuration) * time.Second
}
jobErr := eh.K8s.AwaitK8sJobDone(jobName, maxPollDuration, pollInterval, namespace)
jobErr := eh.K8s.AwaitK8sJobDone(jobName, maxPollDuration, namespace)

logs, err := eh.K8s.GetLogsOfPod(jobName, namespace)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/eventhandler/eventhandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func TestEventMatching(t *testing.T) {
// CreateK8sJob expectation above, to avoid too much complexity we just expect anything for the correct
// number of times
mockK8s.EXPECT().AwaitK8sJobDone(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(),
).Times(totalNoOfExpectedTasks)

mockK8s.EXPECT().GetLogsOfPod(
Expand Down Expand Up @@ -505,8 +505,8 @@ func TestStartK8s(t *testing.T) {
}), gomock.Any(), gomock.Any(),
gomock.Any(), jobNamespace2,
).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName1), 1006*time.Second, pollInterval, jobNamespace1).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName2), defaultMaxPollDuration, pollInterval, jobNamespace2).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName1), 1006*time.Second, jobNamespace1).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName2), defaultMaxPollDuration, jobNamespace2).Times(1)
k8sMock.EXPECT().GetLogsOfPod(gomock.Eq(jobName1), jobNamespace1).Times(1)
k8sMock.EXPECT().GetLogsOfPod(gomock.Eq(jobName2), jobNamespace2).Times(1)

Expand Down Expand Up @@ -557,7 +557,7 @@ func TestStartK8sJobSilent(t *testing.T) {
k8sMock.EXPECT().CreateK8sJob(
gomock.Eq(jobName2), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Any(), defaultMaxPollDuration, pollInterval, "").Times(2)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Any(), defaultMaxPollDuration, "").Times(2)
k8sMock.EXPECT().GetLogsOfPod(gomock.Eq(jobName1), gomock.Any()).Times(1)
k8sMock.EXPECT().GetLogsOfPod(gomock.Eq(jobName2), gomock.Any()).Times(1)

Expand Down Expand Up @@ -597,7 +597,7 @@ func TestStartK8s_TestFinishedEvent(t *testing.T) {
k8sMock.EXPECT().CreateK8sJob(
gomock.Eq(jobName1), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName1), gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName1), gomock.Any(), gomock.Any()).Times(1)
k8sMock.EXPECT().GetLogsOfPod(gomock.Eq(jobName1), gomock.Any()).Times(1)

// set the global timezone for testing
Expand Down Expand Up @@ -669,7 +669,7 @@ func TestExpectImageNotAllowedError(t *testing.T) {
k8sMock.EXPECT().CreateK8sJob(
gomock.Eq(jobName1), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName1), gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
k8sMock.EXPECT().AwaitK8sJobDone(gomock.Eq(jobName1), gomock.Any(), gomock.Any()).Times(1)
k8sMock.EXPECT().GetLogsOfPod(gomock.Eq(jobName1), gomock.Any()).Times(1)

// set the global timezone for testing
Expand Down
8 changes: 4 additions & 4 deletions pkg/eventhandler/fake/eventhandlers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 90 additions & 34 deletions pkg/k8sutils/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
watch2 "k8s.io/apimachinery/pkg/watch"
"log"
"reflect"
"regexp"
Expand Down Expand Up @@ -378,51 +379,106 @@ func (k8s *K8sImpl) CreateK8sJob(
// AwaitK8sJobDone will poll the job status every pollInterval up to maxPollDuration.
// If the job completes successfully before we reach maxPollDuration, no error is returned.
// If the job fails, is suspended or does not complete within maxPollDuration, an appropriate error will be returned
func (k8s *K8sImpl) AwaitK8sJobDone(
jobName string, maxPollDuration time.Duration, pollInterval time.Duration, namespace string,
) error {
jobs := k8s.clientset.BatchV1().Jobs(namespace)
func (k8s *K8sImpl) AwaitK8sJobDone(jobName string, maxPollDuration time.Duration, namespace string) error {
ctx, cancelCtx := context.WithTimeout(context.Background(), maxPollDuration)
defer cancelCtx()

// Create a new field selector with the requirement to only watchJobEvents for the defined jobName
requirement, err := labels.NewRequirement("metadata.name", selection.Equals, []string{jobName})
if err != nil {
return fmt.Errorf("unable to build requirement for field selector: %w", err)
}

fieldSelector := labels.NewSelector()
fieldSelector = fieldSelector.Add(*requirement)

pollingStart := time.Now()
// Start watching job events and only watch as long as the give timeout specifies to avoid stalling the
// await operation for too long
watchJobEvents, err := k8s.clientset.BatchV1().Jobs(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fieldSelector.String(),
})
if err != nil {
return fmt.Errorf("unable to watch Job events for job completion: %w", err)
}

// Start watching kubernetes events and watch only as long as the given timeout specifies to avoid stalling the
// await operation for too long
kindSelector, _ := labels.NewRequirement("involvedObject.kind", selection.Equals, []string{"Pod"})
selector := labels.NewSelector()
selector = selector.Add(*kindSelector)
watchEvents, err := k8s.clientset.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: selector.String(),
})
if err != nil {
return fmt.Errorf("unable to watch events for job completion: %w", err)
}

// As long as there are events in the job events or the pod events, check if the job has already finished or
// if an error is encountered for some reason ...
for {
select {
case event := <-watchJobEvents.ResultChan():
switch event.Type {
case watch2.Added:
/* Ignore added, job is most likely already there */

now := time.Now()
case watch2.Error:
return fmt.Errorf("job %s encountered error while awaitng completion", jobName)

if now.After(pollingStart.Add(maxPollDuration)) {
return fmt.Errorf(
"polling for job %s timing out after %s: %w", jobName, now.Sub(pollingStart),
ErrMaxPollTimeExceeded,
)
}
case watch2.Deleted:
return fmt.Errorf("job %s was deleted while awaiting completion", jobName)

job, err := jobs.Get(context.TODO(), jobName, metav1.GetOptions{})
if err != nil {
return err
}
case watch2.Bookmark:
/* Ignore Bookmark */

for _, condition := range job.Status.Conditions {

switch condition.Type {
case batchv1.JobComplete:
// hooray, it worked
return nil
case batchv1.JobSuspended:
return fmt.Errorf(
"job %s was suspended. Reason: %s, Message: %s", jobName, condition.Reason, condition.Message,
)
case batchv1.JobFailed:
if condition.Reason == reasonJobDeadlineExceeded {
return fmt.Errorf("job %s failed: %w", jobName, ErrTaskDeadlineExceeded)
case watch2.Modified:
job, ok := event.Object.(*batchv1.Job)
if !ok {
return fmt.Errorf("unable to cast object to *batchv1.job")
}

return fmt.Errorf(
"job %s failed. Reason: %s, Message: %s", jobName, condition.Reason, condition.Message,
)
// TODO: why do we need a for loop here if we return for every element?
// shouldn't it be search for JobComplete if not exists return newest Failed, Suspended error?
for _, condition := range job.Status.Conditions {
switch condition.Type {
case batchv1.JobComplete:
// hooray, it worked
return nil

case batchv1.JobSuspended:
return fmt.Errorf(
"job %s was suspended. Reason: %s, Message: %s", jobName, condition.Reason, condition.Message,
)

case batchv1.JobFailed:
if condition.Reason == reasonJobDeadlineExceeded {
return fmt.Errorf("job %s failed: %w", jobName, ErrTaskDeadlineExceeded)
}

return fmt.Errorf(
"job %s failed. Reason: %s, Message: %s", jobName, condition.Reason, condition.Message,
)
}
}
}
}

time.Sleep(pollInterval)
case event := <-watchEvents.ResultChan():
evt, ok := event.Object.(*v1.Event)

// TODO: Is it safe to assume that after 3 "Failed" events the job can't be finished?
if ok && strings.Contains(evt.InvolvedObject.FieldPath, jobName) && evt.Count == 3 && evt.Reason == "Failed" {
return fmt.Errorf("encounterd %d times an error while waiting for job completion: %s", evt.Count, evt.Message)
} else if !ok {
return fmt.Errorf("unable to cast object to *v1.job")
}

// If we reach the max poll timeout we stop watching and return an error:
case <-time.After(maxPollDuration):
return fmt.Errorf(
"polling for job %s timing out after %s: %w", jobName, maxPollDuration,
ErrMaxPollTimeExceeded,
)
}
}
}

Expand Down
Loading