diff --git a/pkg/eventhandler/eventhandlers.go b/pkg/eventhandler/eventhandlers.go index 062f2d1e..91577dc2 100644 --- a/pkg/eventhandler/eventhandlers.go +++ b/pkg/eventhandler/eventhandlers.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "strconv" + "strings" "time" "github.com/keptn/go-utils/pkg/lib/keptn" @@ -147,7 +148,7 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa if err != nil { log.Printf("Error while connecting to cluster: %s\n", err.Error()) if !action.Silent { - sendTaskFailedEvent(eh.Keptn, "", eh.ServiceName, err, "") + sendJobFailedEvent(eh.Keptn, "", eh.ServiceName, err) } return } @@ -165,7 +166,7 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa log.Printf(errorText) if !action.Silent { - sendTaskFailedEvent(eh.Keptn, "", eh.ServiceName, errors.New(errorText), "") + sendTaskFailedEvent(eh.Keptn, task.Name, eh.ServiceName, errors.New(errorText), "") } return @@ -192,7 +193,7 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa if err != nil { log.Printf("Error while creating job: %s\n", err) if !action.Silent { - sendTaskFailedEvent(eh.Keptn, jobName, eh.ServiceName, err, "") + sendTaskFailedEvent(eh.Keptn, task.Name, eh.ServiceName, err, "") } return } @@ -211,14 +212,14 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa if jobErr != nil { log.Printf("Error while creating job: %s\n", jobErr.Error()) if !action.Silent { - sendTaskFailedEvent(eh.Keptn, jobName, eh.ServiceName, jobErr, logs) + sendTaskFailedEvent(eh.Keptn, task.Name, eh.ServiceName, jobErr, logs) } return } allJobLogs = append( allJobLogs, jobLogs{ - name: jobName, + name: task.Name, logs: logs, }, ) @@ -233,13 +234,13 @@ func (eh *EventHandler) startK8sJob(action *config.Action, jsonEventData interfa } } -func sendTaskFailedEvent(myKeptn *keptnv2.Keptn, jobName string, serviceName string, err error, logs string) { +func sendTaskFailedEvent(myKeptn *keptnv2.Keptn, taskName string, serviceName string, err error, logs string) { var message string if logs != "" { - message = fmt.Sprintf("Job %s failed: %s\n\nLogs: \n%s", jobName, err, logs) + message = fmt.Sprintf("Task '%s' failed: %s\n\nLogs: \n%s", taskName, err.Error(), logs) } else { - message = fmt.Sprintf("Job %s failed: %s", jobName, err) + message = fmt.Sprintf("Task '%s' failed: %s", taskName, err.Error()) } _, err = myKeptn.SendTaskFinishedEvent( @@ -255,18 +256,33 @@ func sendTaskFailedEvent(myKeptn *keptnv2.Keptn, jobName string, serviceName str } } +func sendJobFailedEvent(myKeptn *keptnv2.Keptn, jobName string, serviceName string, err error) { + _, err = myKeptn.SendTaskFinishedEvent( + &keptnv2.EventData{ + Status: keptnv2.StatusErrored, + Result: keptnv2.ResultFailed, + Message: fmt.Sprintf("Job %s failed: %s", jobName, err.Error()), + }, serviceName, + ) + + if err != nil { + log.Printf("Error while sending started event: %s\n", err) + } +} + func sendTaskFinishedEvent(myKeptn *keptnv2.Keptn, serviceName string, jobLogs []jobLogs, data dataForFinishedEvent) { - var message string + var logMessage strings.Builder for _, jobLogs := range jobLogs { - message += fmt.Sprintf("Job %s finished successfully!\n\nLogs:\n%s\n\n", jobLogs.name, jobLogs.logs) + logMessage.WriteString( + fmt.Sprintf("Task '%s' finished successfully!\n\nLogs:\n%s\n\n", jobLogs.name, jobLogs.logs), + ) } eventData := &keptnv2.EventData{ - Status: keptnv2.StatusSucceeded, Result: keptnv2.ResultPass, - Message: message, + Message: logMessage.String(), } var err error diff --git a/pkg/eventhandler/fake/eventhandlers_mock.go b/pkg/eventhandler/fake/eventhandlers_mock.go index 24877937..f94e86f6 100644 --- a/pkg/eventhandler/fake/eventhandlers_mock.go +++ b/pkg/eventhandler/fake/eventhandlers_mock.go @@ -193,6 +193,20 @@ func (mr *MockK8sMockRecorder) CreateK8sJob(arg0, arg1, arg2, arg3, arg4, arg5, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateK8sJob", reflect.TypeOf((*MockK8s)(nil).CreateK8sJob), arg0, arg1, arg2, arg3, arg4, arg5, arg6) } +// ExistsServiceAccount mocks base method. +func (m *MockK8s) ExistsServiceAccount(arg0, arg1 string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ExistsServiceAccount", arg0, arg1) + ret0, _ := ret[0].(bool) + return ret0 +} + +// ExistsServiceAccount indicates an expected call of ExistsServiceAccount. +func (mr *MockK8sMockRecorder) ExistsServiceAccount(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExistsServiceAccount", reflect.TypeOf((*MockK8s)(nil).ExistsServiceAccount), arg0, arg1) +} + // GetLogsOfPod mocks base method. func (m *MockK8s) GetLogsOfPod(arg0, arg1 string) (string, error) { m.ctrl.T.Helper() diff --git a/pkg/k8sutils/pod.go b/pkg/k8sutils/pod.go index dbf88f70..3f496bdd 100644 --- a/pkg/k8sutils/pod.go +++ b/pkg/k8sutils/pod.go @@ -3,19 +3,17 @@ package k8sutils import ( "bytes" "context" + "fmt" "io" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "strings" ) // GetLogsOfPod returns the k8s logs of a job in a namespace func (k8s *K8sImpl) GetLogsOfPod(jobName string, namespace string) (string, error) { - // TODO include the logs of the initcontainer - - podLogOpts := v1.PodLogOptions{} - list, err := k8s.clientset.CoreV1().Pods(namespace).List( context.TODO(), metav1.ListOptions{ LabelSelector: "job-name=" + jobName, @@ -25,24 +23,170 @@ func (k8s *K8sImpl) GetLogsOfPod(jobName string, namespace string) (string, erro return "", err } - logs := "" + var logs strings.Builder for _, pod := range list.Items { - req := k8s.clientset.CoreV1().Pods(namespace).GetLogs(pod.Name, &podLogOpts) - podLogs, err := req.Stream(context.TODO()) - if err != nil { - return "", err + printInitContainerLogs := false + containerStatus := getTerminatedContainersWithStatusOfPod(pod) + + // Go through all containers and check if the termination reason is Completed, if not we found a container + // that exited with an error and therefore have to include all logs from init container, as files could not + // have been copied over + for _, container := range containerStatus { + if container.status.Reason != "Completed" { + printInitContainerLogs = true + break + } } - defer podLogs.Close() - buf := new(bytes.Buffer) - _, err = io.Copy(buf, podLogs) - if err != nil { - return "", err + // Query all logs from containers that have terminated and therefore already had the chance to + // produce logs, otherwise the k8s api will return an error + for _, container := range containerStatus { + + // If we don't want to print the init container logs, we just skip this iteration of the + // loop + if container.containerType == initContainerType && !printInitContainerLogs { + continue + } + + // Query logs of the current selected container + logsOfContainer, err := getLogsOfContainer(k8s, pod, namespace, container.name) + if err != nil { + // In case we can't query the logs of a container, we append the reason instead of the container logs + logsOfContainer = fmt.Sprintf("Unable to query logs of container: %s", err.Error()) + } + + // If the container did not put out any logs, we skip it entirely to prevent polluting the + // log output too much by appending a lot of empty lines for each container + if logsOfContainer != "" { + logs.WriteString(buildLogOutputForContainer(pod, container, logsOfContainer)) + logs.WriteString("\n") + } } - logs += buf.String() } - return logs, nil + return logs.String(), nil +} + +const ( + // Indicates that the container is an Init container + initContainerType = iota + // Indicates that the container is a container defined in the job workload + jobContainerType +) + +type containerStatus struct { + name string + containerType int + status *v1.ContainerStateTerminated +} + +// getLogsOfContainer returns the logs of a specific container inside the given pod +func getLogsOfContainer(k8s *K8sImpl, pod v1.Pod, namespace string, container string) (string, error) { + + // Request logs of a specific container + req := k8s.clientset.CoreV1().Pods(namespace).GetLogs(pod.Name, &v1.PodLogOptions{ + Container: container, + }) + + // Stream logs into a buffer + podLogs, err := req.Stream(context.TODO()) + if err != nil { + return "", err + } + + defer podLogs.Close() + + // Convert the buffer into a string + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + return "", err + } + + return buf.String(), nil +} + +// getTerminatedContainersWithStatusOfPod collects the terminated states of all containers inside a given Pod +func getTerminatedContainersWithStatusOfPod(pod v1.Pod) []containerStatus { + var containerStatusList []containerStatus + + // Loop over all initContainers in the Pod spec and look at the appropriate + // InitContainerStatus index to determine the status of the init container + for index, initContainer := range pod.Spec.InitContainers { + if pod.Status.InitContainerStatuses[index].State.Terminated != nil { + containerStatusList = append(containerStatusList, containerStatus{ + name: initContainer.Name, + containerType: initContainerType, + status: pod.Status.InitContainerStatuses[index].State.Terminated, + }) + } + } + + // Loop over all regular containers in the Pod spec and look at the appropriate + // ContainerStatus index to determine the status of the container + for index, container := range pod.Spec.Containers { + if pod.Status.ContainerStatuses[index].State.Terminated != nil { + containerStatusList = append(containerStatusList, containerStatus{ + name: container.Name, + containerType: jobContainerType, + status: pod.Status.ContainerStatuses[index].State.Terminated, + }) + } + } + + return containerStatusList +} + +// buildLogOutputForContainer generates a pretty output of the given logs and the container status in the following +// format. Depending on the status the output changes slightly (output will be empty of no logs are produced): +// +// - Normal output: +// Container of pod : +// +// +// - In case of an error: +// Container of pod terminated with an error (Reason: [, Message: |, ExitCode: ]): +// +// +func buildLogOutputForContainer(pod v1.Pod, container containerStatus, logsOfContainer string) string { + var logs strings.Builder + + // Prepend the container name at the beginning, so we are able to separate logs of different containers + // and display a termination error at the beginning, may be more interesting than the logs of the container + if container.status.Reason != "Completed" { + logs.WriteString("Container ") + logs.WriteString(container.name) + logs.WriteString(" of pod ") + logs.WriteString(pod.Name) + logs.WriteString(" terminated with an error (Reason: ") + logs.WriteString(container.status.Reason) + + // Sometimes the message is not given, to provide prettier logs we just don't print the + // message part if it doesn't exist + if container.status.Message != "" { + logs.WriteString(", Message: ") + logs.WriteString(container.status.Message) + logs.WriteString(")") + } else { + logs.WriteString(", ExitCode: ") + logs.WriteString(fmt.Sprintf("%d", container.status.ExitCode)) + logs.WriteString(")") + } + + logs.WriteString(":\n") + } else { + logs.WriteString("Container ") + logs.WriteString(container.name) + logs.WriteString(" of pod ") + logs.WriteString(pod.Name) + logs.WriteString(":\n") + } + + // Finally, append the actual logs of the container or a default message to the log + logs.WriteString(logsOfContainer) + logs.WriteString("\n") + + return logs.String() } diff --git a/pkg/k8sutils/pod_test.go b/pkg/k8sutils/pod_test.go index b0749e1a..9dfc2e31 100644 --- a/pkg/k8sutils/pod_test.go +++ b/pkg/k8sutils/pod_test.go @@ -124,10 +124,10 @@ func TestGetLogsOfPodHappyPath(t *testing.T) { logsOfPod, err := k8s.GetLogsOfPod(jobName, namespace) assert.NoError(t, err) - assert.Equal(t, logsOfPod, "fake logs") + assert.Contains(t, logsOfPod, "fake logs") // Assert that the fake received the call - getLogAction := k8stesting.GenericActionImpl{ + getLogActionInitContainer := k8stesting.GenericActionImpl{ ActionImpl: k8stesting.ActionImpl{ Namespace: namespace, Verb: "get", @@ -138,8 +138,27 @@ func TestGetLogsOfPodHappyPath(t *testing.T) { }, Subresource: "log", }, - Value: &v1.PodLogOptions{}, + Value: &v1.PodLogOptions{ + Container: initContainerName, + }, + } + + getLogActionContainer := k8stesting.GenericActionImpl{ + ActionImpl: k8stesting.ActionImpl{ + Namespace: namespace, + Verb: "get", + Resource: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + Subresource: "log", + }, + Value: &v1.PodLogOptions{ + Container: jobName, + }, } - assert.Contains(t, k8sClientSet.Actions(), getLogAction) + assert.Contains(t, k8sClientSet.Actions(), getLogActionInitContainer) + assert.Contains(t, k8sClientSet.Actions(), getLogActionContainer) }