diff --git a/cmd/cdi-importer/BUILD.bazel b/cmd/cdi-importer/BUILD.bazel index 46c1239bcb..d9bbbcf238 100644 --- a/cmd/cdi-importer/BUILD.bazel +++ b/cmd/cdi-importer/BUILD.bazel @@ -13,10 +13,10 @@ go_library( "//pkg/util:go_default_library", "//pkg/util/prometheus:go_default_library", "//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library", - "//vendor/github.com/pkg/errors:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", + "//vendor/k8s.io/utils/ptr:go_default_library", ], ) diff --git a/cmd/cdi-importer/importer.go b/cmd/cdi-importer/importer.go index 426c99f27f..edb4b577f9 100644 --- a/cmd/cdi-importer/importer.go +++ b/cmd/cdi-importer/importer.go @@ -13,6 +13,7 @@ package main // ImporterSecretKey Optional. Secret key is the password to your account. import ( + "errors" "flag" "fmt" "os" @@ -20,11 +21,10 @@ import ( "strings" "time" - "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" + "k8s.io/utils/ptr" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" "kubevirt.io/containerized-data-importer/pkg/common" @@ -149,20 +149,17 @@ func main() { } func handleEmptyImage(contentType string, imageSize string, availableDestSpace int64, preallocation bool, volumeMode v1.PersistentVolumeMode, filesystemOverhead float64) error { - var preallocationApplied bool - if contentType == string(cdiv1.DataVolumeKubeVirt) { if volumeMode == v1.PersistentVolumeBlock && !preallocation { klog.V(1).Infoln("Blank block without preallocation is exactly an empty PVC, done populating") return nil } createBlankImage(imageSize, availableDestSpace, preallocation, volumeMode, filesystemOverhead) - preallocationApplied = preallocation } else { errorEmptyDiskWithContentTypeArchive() } - err := importCompleteTerminationMessage(preallocationApplied) + err := writeTerminationMessage(&common.TerminationMessage{PreallocationApplied: ptr.To(preallocation)}) return err } @@ -181,49 +178,47 @@ func handleImport( processor := newDataProcessor(contentType, volumeMode, ds, imageSize, filesystemOverhead, preallocation) err := processor.ProcessData() - if err != nil { + scratchSpaceRequired := errors.Is(err, importer.ErrRequiresScratchSpace) + if err != nil && !scratchSpaceRequired { klog.Errorf("%+v", err) - if err == importer.ErrRequiresScratchSpace { - if err := util.WriteTerminationMessage(common.ScratchSpaceRequired); err != nil { - klog.Errorf("%+v", err) - } - // Exiting instead of returning 0 as normally to avoid clashing - // with cleanup functions (fsyncDataFile) that assume the imported - // file will be there during regular exit. - os.Exit(0) - } - err = util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error())) - if err != nil { + if err := util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error())); err != nil { klog.Errorf("%+v", err) } - return 1 } + + termMsg := ds.GetTerminationMessage() + if termMsg == nil { + termMsg = &common.TerminationMessage{} + } + termMsg.ScratchSpaceRequired = &scratchSpaceRequired + termMsg.PreallocationApplied = ptr.To(processor.PreallocationApplied()) + touchDoneFile() - // due to the way some data sources can add additional information to termination message - // after finished (ds.close() ) termination message has to be written first, before the - // the ds is closed - // TODO: think about making communication explicit, probably DS interface should be extended - err = importCompleteTerminationMessage(processor.PreallocationApplied()) - if err != nil { + if err := writeTerminationMessage(termMsg); err != nil { klog.Errorf("%+v", err) return 1 } + if scratchSpaceRequired { + // Exiting instead of returning 0 as normally to avoid clashing + // with cleanup functions (fsyncDataFile) that assume the imported + // file will be there during regular exit. + os.Exit(0) + } + return 0 } -func importCompleteTerminationMessage(preallocationApplied bool) error { - message := "Import Complete" - if preallocationApplied { - message += ", " + common.PreallocationApplied - } - err := util.WriteTerminationMessage(message) +func writeTerminationMessage(termMsg *common.TerminationMessage) error { + msg, err := termMsg.String() if err != nil { return err } - - klog.V(1).Infoln(message) + if err := util.WriteTerminationMessage(msg); err != nil { + return err + } + klog.V(1).Infoln(msg) return nil } diff --git a/pkg/common/BUILD.bazel b/pkg/common/BUILD.bazel index 197807eae7..e65ebee0ac 100644 --- a/pkg/common/BUILD.bazel +++ b/pkg/common/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -7,3 +7,17 @@ go_library( visibility = ["//visibility:public"], deps = ["//vendor/k8s.io/api/core/v1:go_default_library"], ) + +go_test( + name = "go_default_test", + srcs = [ + "common_suite_test.go", + "common_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//vendor/github.com/onsi/ginkgo/v2:go_default_library", + "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/k8s.io/utils/ptr:go_default_library", + ], +) diff --git a/pkg/common/common.go b/pkg/common/common.go index 6fbbcce4c5..3fe5ca607b 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -1,6 +1,8 @@ package common import ( + "encoding/json" + "fmt" "time" v1 "k8s.io/api/core/v1" @@ -328,3 +330,31 @@ var AsyncUploadFormPaths = []string{ UploadFormAsync, "/v1alpha1/upload-form-async", } + +// VddkInfo holds VDDK version and connection information returned by an importer pod +type VddkInfo struct { + Version string + Host string +} + +// TerminationMessage contains data to be serialized and used as the termination message of the importer. +type TerminationMessage struct { + ScratchSpaceRequired *bool `json:"scratchSpaceRequired,omitempty"` + PreallocationApplied *bool `json:"preallocationApplied,omitempty"` + VddkInfo *VddkInfo `json:"vddkInfo,omitempty"` + Labels map[string]string `json:"labels,omitempty"` +} + +func (it *TerminationMessage) String() (string, error) { + msg, err := json.Marshal(it) + if err != nil { + return "", err + } + + // Messages longer than 4096 are truncated by kubelet + if length := len(msg); length > 4096 { + return "", fmt.Errorf("Termination message length %d exceeds maximum length of 4096 bytes", length) + } + + return string(msg), nil +} diff --git a/pkg/common/common_suite_test.go b/pkg/common/common_suite_test.go new file mode 100644 index 0000000000..087a1e5857 --- /dev/null +++ b/pkg/common/common_suite_test.go @@ -0,0 +1,13 @@ +package common_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCommon(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Common Suite") +} diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go new file mode 100644 index 0000000000..145b66a46b --- /dev/null +++ b/pkg/common/common_test.go @@ -0,0 +1,43 @@ +package common + +import ( + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/utils/ptr" +) + +var _ = Describe("TerminationMessage", func() { + It("Should successfully serialize a TerminationMessage", func() { + termMsg := TerminationMessage{ + VddkInfo: &VddkInfo{ + Version: "testversion", + Host: "testhost", + }, + Labels: map[string]string{ + "testlabel": "testvalue", + }, + PreallocationApplied: ptr.To(true), + } + + serialized, err := termMsg.String() + Expect(err).ToNot(HaveOccurred()) + Expect(serialized).To(Equal(`{"preallocationApplied":true,"vddkInfo":{"Version":"testversion","Host":"testhost"},"labels":{"testlabel":"testvalue"}}`)) + }) + + It("Should fail if serialized data is longer than 4096 bytes", func() { + const length = 5000 + const serializationOffset = 19 + + termMsg := TerminationMessage{ + Labels: map[string]string{}, + } + for i := 0; i < length-serializationOffset; i++ { + termMsg.Labels["t"] += "c" + } + + _, err := termMsg.String() + Expect(err).To(MatchError(fmt.Sprintf("Termination message length %d exceeds maximum length of 4096 bytes", length))) + }) +}) diff --git a/pkg/controller/clone-controller.go b/pkg/controller/clone-controller.go index a9ef5e7655..0d5e8c4ea2 100644 --- a/pkg/controller/clone-controller.go +++ b/pkg/controller/clone-controller.go @@ -331,7 +331,7 @@ func (r *CloneReconciler) updatePvcFromPod(sourcePod *corev1.Pod, pvc *corev1.Pe r.recorder.Event(pvc, corev1.EventTypeNormal, CloneSucceededPVC, cc.CloneComplete) } - setAnnotationsFromPodWithPrefix(pvc.Annotations, sourcePod, cc.AnnSourceRunningCondition) + setAnnotationsFromPodWithPrefix(pvc.Annotations, sourcePod, nil, cc.AnnSourceRunningCondition) if !reflect.DeepEqual(currentPvcCopy, pvc) { return r.updatePVC(pvc) diff --git a/pkg/controller/import-controller.go b/pkg/controller/import-controller.go index 5756c5eb35..80e83e342a 100644 --- a/pkg/controller/import-controller.go +++ b/pkg/controller/import-controller.go @@ -366,19 +366,23 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p log.V(1).Info("Updating PVC from pod") anno := pvc.GetAnnotations() - setAnnotationsFromPodWithPrefix(anno, pod, cc.AnnRunningCondition) - scratchExitCode := false + termMsg, err := parseTerminationMessage(pod) + if err != nil { + log.V(3).Info("Ignoring failure to parse termination message", "error", err.Error()) + } + setAnnotationsFromPodWithPrefix(anno, pod, termMsg, cc.AnnRunningCondition) + + scratchSpaceRequired := termMsg != nil && termMsg.ScratchSpaceRequired != nil && *termMsg.ScratchSpaceRequired + if scratchSpaceRequired { + log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name) + } + if pod.Status.ContainerStatuses != nil && - pod.Status.ContainerStatuses[0].State.Terminated != nil { - if message := pod.Status.ContainerStatuses[0].State.Terminated.Message; strings.Contains(message, common.ScratchSpaceRequired) { - log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name) - scratchExitCode = true - anno[cc.AnnRequiresScratch] = "true" - } else if pod.Status.ContainerStatuses[0].State.Terminated.ExitCode > 0 { - log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].State.Terminated.ExitCode) - r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].State.Terminated.Message) - } + pod.Status.ContainerStatuses[0].State.Terminated != nil && + pod.Status.ContainerStatuses[0].State.Terminated.ExitCode > 0 { + log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].State.Terminated.ExitCode) + r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].State.Terminated.Message) } if anno[cc.AnnCurrentCheckpoint] != "" { @@ -386,8 +390,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p } anno[cc.AnnImportPod] = string(pod.Name) - if !scratchExitCode { - // No scratch exit code, update the phase based on the pod. If we do have scratch exit code we don't want to update the + if !scratchSpaceRequired { + // No scratch space required, update the phase based on the pod. If we require scratch space we don't want to update the // phase, because the pod might terminate cleanly and mistakenly mark the import complete. anno[cc.AnnPodPhase] = string(pod.Status.Phase) } @@ -420,8 +424,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p log.V(1).Info("Updated PVC", "pvc.anno.Phase", anno[cc.AnnPodPhase], "pvc.anno.Restarts", anno[cc.AnnPodRestarts]) } - if cc.IsPVCComplete(pvc) || scratchExitCode { - if !scratchExitCode { + if cc.IsPVCComplete(pvc) || scratchSpaceRequired { + if !scratchSpaceRequired { r.recorder.Event(pvc, corev1.EventTypeNormal, ImportSucceededPVC, "Import Successful") log.V(1).Info("Import completed successfully") } diff --git a/pkg/controller/import-controller_test.go b/pkg/controller/import-controller_test.go index 90e52210e6..de9555f9c4 100644 --- a/pkg/controller/import-controller_test.go +++ b/pkg/controller/import-controller_test.go @@ -410,6 +410,37 @@ var _ = Describe("Update PVC from POD", func() { Expect(resPvc.GetAnnotations()[cc.AnnRunningConditionReason]).To(Equal("Reason")) }) + DescribeTable("Should handle termination messages", func(termMsg, conditionMessage string) { + pvc := cc.CreatePvc("testPvc1", "default", map[string]string{}, nil) + pod := cc.CreateImporterTestPod(pvc, "testPvc1", nil) + pod.Status = corev1.PodStatus{ + Phase: corev1.PodSucceeded, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: termMsg, + }, + }, + }, + }, + } + reconciler = createImportReconciler(pvc, pod) + err := reconciler.updatePvcFromPod(pvc, pod, reconciler.log) + Expect(err).ToNot(HaveOccurred()) + + resPvc := &corev1.PersistentVolumeClaim{} + err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "testPvc1", Namespace: "default"}, resPvc) + Expect(err).ToNot(HaveOccurred()) + + Expect(resPvc.GetAnnotations()).To(HaveKeyWithValue(cc.AnnPodPhase, string(corev1.PodSucceeded))) + Expect(resPvc.GetAnnotations()).To(HaveKeyWithValue(cc.AnnRunningCondition, "false")) + Expect(resPvc.GetAnnotations()).To(HaveKeyWithValue(cc.AnnRunningConditionMessage, conditionMessage)) + }, + Entry("Message which can be unmarshalled", `{"preAllocationApplied": true}`, ImportCompleteMessage), + Entry("Message which cannot be unmarshalled", "somemessage", "somemessage"), + ) + It("Should update the PVC status to running, if pod is running", func() { pvc := cc.CreatePvc("testPvc1", "default", map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodPending)}, nil) pod := cc.CreateImporterTestPod(pvc, "testPvc1", nil) @@ -531,7 +562,7 @@ var _ = Describe("Update PVC from POD", func() { Expect(resPvc.GetAnnotations()[cc.AnnRunningConditionReason]).To(Equal("Explosion")) }) - It("Should NOT update phase on PVC, if pod exited with scratchspace required msg", func() { + It("Should NOT update phase on PVC, if pod exited with termination message stating scratch space is required", func() { pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound) scratchPvcName := &corev1.PersistentVolumeClaim{} scratchPvcName.Name = "testPvc1-scratch" @@ -543,7 +574,7 @@ var _ = Describe("Update PVC from POD", func() { State: v1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ ExitCode: 0, - Message: common.ScratchSpaceRequired, + Message: `{"scratchSpaceRequired": true}`, }, }, }, @@ -640,7 +671,7 @@ var _ = Describe("Update PVC from POD", func() { State: v1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ ExitCode: 0, - Message: `Import Complete; VDDK: {"Version": "1.0.0", "Host": "esx15.test.lan"}`, + Message: `{"vddkInfo": {"Version": "1.0.0", "Host": "esx15.test.lan"}}`, Reason: "Completed", }, }, @@ -672,10 +703,7 @@ var _ = Describe("Update PVC from POD", func() { ContainerStatuses: []corev1.ContainerStatus{ { LastTerminationState: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 0, - Message: common.ScratchSpaceRequired, - }, + Terminated: &corev1.ContainerStateTerminated{}, }, }, }, diff --git a/pkg/controller/upload-controller.go b/pkg/controller/upload-controller.go index d43f621ffd..f32c05de4f 100644 --- a/pkg/controller/upload-controller.go +++ b/pkg/controller/upload-controller.go @@ -488,7 +488,7 @@ func updateUploadAnnotations(pvc *corev1.PersistentVolumeClaim, anno map[string] anno[cc.AnnPodPhase] = string(podPhase) anno[cc.AnnPodReady] = strconv.FormatBool(isPodReady(pod)) - setAnnotationsFromPodWithPrefix(anno, pod, cc.AnnRunningCondition) + setAnnotationsFromPodWithPrefix(anno, pod, nil, cc.AnnRunningCondition) } func isPodReady(pod *v1.Pod) bool { diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 6828590fc7..58c54db416 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -20,7 +20,6 @@ import ( "context" "crypto/rsa" "encoding/json" - "regexp" "strconv" "strings" @@ -61,6 +60,9 @@ const ( // ScratchSpaceRequiredReason is a const that defines the pod exited due to a lack of scratch space ScratchSpaceRequiredReason = "Scratch space required" + // ImportCompleteMessage is a const that defines the pod completeded the import successfully + ImportCompleteMessage = "Import Complete" + // ProxyCertVolName is the name of the volumecontaining certs ProxyCertVolName = "cdi-proxy-cert-vol" // ClusterWideProxyAPIGroup is the APIGroup for OpenShift Cluster Wide Proxy @@ -79,10 +81,6 @@ const ( ClusterWideProxyConfigMapKey = "ca-bundle.crt" ) -var ( - vddkInfoMatch = regexp.MustCompile(`((.*; )|^)VDDK: (?P{.*})`) -) - func checkPVC(pvc *v1.PersistentVolumeClaim, annotation string, log logr.Logger) bool { // check if we have proper annotation if !metav1.HasAnnotation(pvc.ObjectMeta, annotation) { @@ -277,7 +275,7 @@ func podSucceededFromPVC(pvc *v1.PersistentVolumeClaim) bool { return podPhaseFromPVC(pvc) == v1.PodSucceeded } -func setAnnotationsFromPodWithPrefix(anno map[string]string, pod *v1.Pod, prefix string) { +func setAnnotationsFromPodWithPrefix(anno map[string]string, pod *v1.Pod, termMsg *common.TerminationMessage, prefix string) { if pod == nil || pod.Status.ContainerStatuses == nil { return } @@ -286,40 +284,54 @@ func setAnnotationsFromPodWithPrefix(anno map[string]string, pod *v1.Pod, prefix if podRestarts >= annPodRestarts { anno[cc.AnnPodRestarts] = strconv.Itoa(podRestarts) } - setVddkAnnotations(anno, pod) + containerState := pod.Status.ContainerStatuses[0].State if containerState.Running != nil { anno[prefix] = "true" anno[prefix+".message"] = "" anno[prefix+".reason"] = PodRunningReason - } else { - anno[cc.AnnRunningCondition] = "false" - if containerState.Waiting != nil && containerState.Waiting.Reason != "CrashLoopBackOff" { - anno[prefix+".message"] = simplifyKnownMessage(containerState.Waiting.Message) - anno[prefix+".reason"] = containerState.Waiting.Reason - } else if containerState.Terminated != nil { - anno[prefix+".message"] = simplifyKnownMessage(containerState.Terminated.Message) - reason := containerState.Terminated.Reason - if reason == common.GenericError { - reason = handleGenericErrorReason(containerState.Terminated.Message) + return + } + + anno[cc.AnnRunningCondition] = "false" + if containerState.Waiting != nil && containerState.Waiting.Reason != "CrashLoopBackOff" { + anno[prefix+".message"] = simplifyKnownMessage(containerState.Waiting.Message) + anno[prefix+".reason"] = containerState.Waiting.Reason + return + } + + if containerState.Terminated != nil { + if termMsg != nil { + if termMsg.ScratchSpaceRequired != nil && *termMsg.ScratchSpaceRequired { + anno[cc.AnnRequiresScratch] = "true" + anno[prefix+".message"] = common.ScratchSpaceRequired + anno[prefix+".reason"] = ScratchSpaceRequiredReason + return + } + // Handle extended termination message + anno[prefix+".message"] = ImportCompleteMessage + if termMsg.VddkInfo != nil { + if termMsg.VddkInfo.Host != "" { + anno[cc.AnnVddkHostConnection] = termMsg.VddkInfo.Host + } + if termMsg.VddkInfo.Version != "" { + anno[cc.AnnVddkVersion] = termMsg.VddkInfo.Version + } + } + if termMsg.PreallocationApplied != nil && *termMsg.PreallocationApplied { + anno[cc.AnnPreallocationApplied] = "true" } - anno[prefix+".reason"] = reason + } else { + // Handle plain termination message (legacy) + anno[prefix+".message"] = simplifyKnownMessage(containerState.Terminated.Message) if strings.Contains(containerState.Terminated.Message, common.PreallocationApplied) { anno[cc.AnnPreallocationApplied] = "true" } } + anno[prefix+".reason"] = containerState.Terminated.Reason } } -func handleGenericErrorReason(message string) string { - if strings.Contains(message, common.ScratchSpaceRequired) { - // Sometimes the pod will need scratch space to complete some operations. - // Better to add a custom reason instead of a generic container state. - return ScratchSpaceRequiredReason - } - return common.GenericError -} - func simplifyKnownMessage(msg string) string { if strings.Contains(msg, "is larger than the reported available") || strings.Contains(msg, "no space left on device") || @@ -330,33 +342,22 @@ func simplifyKnownMessage(msg string) string { return msg } -func setVddkAnnotations(anno map[string]string, pod *v1.Pod) { - if pod.Status.ContainerStatuses[0].State.Terminated == nil { - return - } - terminationMessage := pod.Status.ContainerStatuses[0].State.Terminated.Message - klog.V(1).Info("Saving VDDK annotations from pod status message: ", "message", terminationMessage) - - var terminationInfo string - matches := vddkInfoMatch.FindAllStringSubmatch(terminationMessage, -1) - for index, matchName := range vddkInfoMatch.SubexpNames() { - if matchName == "info" && len(matches) > 0 { - terminationInfo = matches[0][index] - break - } +func parseTerminationMessage(pod *v1.Pod) (*common.TerminationMessage, error) { + if pod == nil || pod.Status.ContainerStatuses == nil { + return nil, nil } - var vddkInfo util.VddkInfo - err := json.Unmarshal([]byte(terminationInfo), &vddkInfo) - if err != nil { - return - } - if vddkInfo.Host != "" { - anno[cc.AnnVddkHostConnection] = vddkInfo.Host + state := pod.Status.ContainerStatuses[0].State + if state.Terminated == nil || state.Terminated.ExitCode != 0 { + return nil, nil } - if vddkInfo.Version != "" { - anno[cc.AnnVddkVersion] = vddkInfo.Version + + termMsg := &common.TerminationMessage{} + if err := json.Unmarshal([]byte(state.Terminated.Message), termMsg); err != nil { + return nil, err } + + return termMsg, nil } func setBoundConditionFromPVC(anno map[string]string, prefix string, pvc *v1.PersistentVolumeClaim) { diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index 8423107b88..d58b751d16 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -13,6 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" logf "sigs.k8s.io/controller-runtime/pkg/log" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" @@ -148,7 +149,7 @@ var _ = Describe("setAnnotationsFromPod", func() { }, }, } - setAnnotationsFromPodWithPrefix(result, testPod, AnnRunningCondition) + setAnnotationsFromPodWithPrefix(result, testPod, nil, AnnRunningCondition) Expect(result[AnnRunningCondition]).To(Equal("true")) Expect(result[AnnRunningConditionReason]).To(Equal("Pod is running")) }) @@ -168,7 +169,7 @@ var _ = Describe("setAnnotationsFromPod", func() { }, }, } - setAnnotationsFromPodWithPrefix(result, testPod, AnnRunningCondition) + setAnnotationsFromPodWithPrefix(result, testPod, nil, AnnRunningCondition) Expect(result[AnnRunningCondition]).To(Equal("false")) Expect(result[AnnRunningConditionMessage]).To(Equal("The container completed")) Expect(result[AnnRunningConditionReason]).To(Equal("Completed")) @@ -189,7 +190,7 @@ var _ = Describe("setAnnotationsFromPod", func() { }, }, } - setAnnotationsFromPodWithPrefix(result, testPod, AnnRunningCondition) + setAnnotationsFromPodWithPrefix(result, testPod, nil, AnnRunningCondition) Expect(result[AnnRunningCondition]).To(Equal("false")) Expect(result[AnnRunningConditionMessage]).To(Equal("container is waiting")) Expect(result[AnnRunningConditionReason]).To(Equal("Pending")) @@ -202,37 +203,32 @@ var _ = Describe("setAnnotationsFromPod", func() { ContainerStatuses: []v1.ContainerStatus{ { State: v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{ - Message: "container completed, " + common.PreallocationApplied, - Reason: "Completed", - }, + Terminated: &v1.ContainerStateTerminated{}, }, }, }, } - setAnnotationsFromPodWithPrefix(result, testPod, AnnRunningCondition) + setAnnotationsFromPodWithPrefix(result, testPod, &common.TerminationMessage{PreallocationApplied: ptr.To(true)}, AnnRunningCondition) Expect(result[AnnPreallocationApplied]).To(Equal("true")) }) - It("Should handle generic error when msg is scratch space required", func() { + It("Should set scratch space required status", func() { result := make(map[string]string) testPod := CreateImporterTestPod(CreatePvc("test", metav1.NamespaceDefault, nil, nil), "test", nil) testPod.Status = v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { State: v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{ - Message: common.ScratchSpaceRequired, - Reason: common.GenericError, - }, + Terminated: &v1.ContainerStateTerminated{}, }, }, }, } - setAnnotationsFromPodWithPrefix(result, testPod, AnnRunningCondition) + setAnnotationsFromPodWithPrefix(result, testPod, &common.TerminationMessage{ScratchSpaceRequired: ptr.To(true)}, AnnRunningCondition) Expect(result[AnnRunningCondition]).To(Equal("false")) Expect(result[AnnRunningConditionMessage]).To(Equal(common.ScratchSpaceRequired)) Expect(result[AnnRunningConditionReason]).To(Equal(ScratchSpaceRequiredReason)) + Expect(result[AnnRequiresScratch]).To(Equal("true")) }) }) diff --git a/pkg/importer/data-processor.go b/pkg/importer/data-processor.go index ca7b2e8538..3517dd15e1 100644 --- a/pkg/importer/data-processor.go +++ b/pkg/importer/data-processor.go @@ -89,6 +89,8 @@ type DataSourceInterface interface { TransferFile(fileName string) (ProcessingPhase, error) // Geturl returns the url that the data processor can use when converting the data. GetURL() *url.URL + // GetTerminationMessage returns data to be serialized and used as the termination message of the importer. + GetTerminationMessage() *common.TerminationMessage // Close closes any readers or other open resources. Close() error } diff --git a/pkg/importer/data-processor_test.go b/pkg/importer/data-processor_test.go index 1ffe02e9c6..8cc7a181b7 100644 --- a/pkg/importer/data-processor_test.go +++ b/pkg/importer/data-processor_test.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/image" ) @@ -89,6 +90,11 @@ func (m *MockDataProvider) GetURL() *url.URL { return m.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (m *MockDataProvider) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (m *MockDataProvider) Close() error { return nil diff --git a/pkg/importer/gcs-datasource.go b/pkg/importer/gcs-datasource.go index afcb122710..020b377bb2 100644 --- a/pkg/importer/gcs-datasource.go +++ b/pkg/importer/gcs-datasource.go @@ -16,6 +16,7 @@ import ( "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -156,6 +157,11 @@ func (sd *GCSDataSource) GetURL() *url.URL { return sd.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (sd *GCSDataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (sd *GCSDataSource) Close() error { var err error diff --git a/pkg/importer/http-datasource.go b/pkg/importer/http-datasource.go index fd98003d88..17ceeecdd6 100644 --- a/pkg/importer/http-datasource.go +++ b/pkg/importer/http-datasource.go @@ -194,6 +194,11 @@ func (hs *HTTPDataSource) GetURL() *url.URL { return hs.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (hs *HTTPDataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // Close all readers. func (hs *HTTPDataSource) Close() error { var err error diff --git a/pkg/importer/imageio-datasource.go b/pkg/importer/imageio-datasource.go index bf937e442f..9b8b6a1672 100644 --- a/pkg/importer/imageio-datasource.go +++ b/pkg/importer/imageio-datasource.go @@ -38,6 +38,7 @@ import ( "github.com/pkg/errors" "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -193,6 +194,11 @@ func (is *ImageioDataSource) GetURL() *url.URL { return is.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (is *ImageioDataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // Close all readers. func (is *ImageioDataSource) Close() error { var err error diff --git a/pkg/importer/registry-datasource.go b/pkg/importer/registry-datasource.go index b6ccdba7ec..b2abe17eaf 100644 --- a/pkg/importer/registry-datasource.go +++ b/pkg/importer/registry-datasource.go @@ -120,6 +120,11 @@ func (rd *RegistryDataSource) GetURL() *url.URL { return rd.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (rd *RegistryDataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (rd *RegistryDataSource) Close() error { // No-op, no open readers diff --git a/pkg/importer/s3-datasource.go b/pkg/importer/s3-datasource.go index f44fdbf1e4..c9e691c840 100644 --- a/pkg/importer/s3-datasource.go +++ b/pkg/importer/s3-datasource.go @@ -17,6 +17,7 @@ import ( "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -126,6 +127,11 @@ func (sd *S3DataSource) GetURL() *url.URL { return sd.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (sd *S3DataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (sd *S3DataSource) Close() error { var err error diff --git a/pkg/importer/upload-datasource.go b/pkg/importer/upload-datasource.go index f4ecc286af..c6964d6613 100644 --- a/pkg/importer/upload-datasource.go +++ b/pkg/importer/upload-datasource.go @@ -9,6 +9,7 @@ import ( "k8s.io/klog/v2" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -107,6 +108,11 @@ func (ud *UploadDataSource) GetURL() *url.URL { return ud.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (ud *UploadDataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (ud *UploadDataSource) Close() error { if ud.stream != nil { @@ -187,6 +193,11 @@ func (aud *AsyncUploadDataSource) GetURL() *url.URL { return aud.uploadDataSource.GetURL() } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (aud *AsyncUploadDataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // GetResumePhase returns the next phase to process when resuming func (aud *AsyncUploadDataSource) GetResumePhase() ProcessingPhase { return aud.ResumePhase diff --git a/pkg/importer/vddk-datasource_amd64.go b/pkg/importer/vddk-datasource_amd64.go index c3feddf6b6..ec5234685d 100644 --- a/pkg/importer/vddk-datasource_amd64.go +++ b/pkg/importer/vddk-datasource_amd64.go @@ -24,7 +24,6 @@ import ( "bytes" "container/ring" "context" - "encoding/json" "errors" "fmt" "net/url" @@ -941,22 +940,6 @@ func (vs *VDDKDataSource) Info() (ProcessingPhase, error) { // Close closes any readers or other open resources. func (vs *VDDKDataSource) Close() error { - if vddkVersion != "" || vddkHost != "" { - existingbytes, _ := os.ReadFile(common.PodTerminationMessageFile) - existing := string(existingbytes) - if existing != "" { - existing += "; " - } - stopinfo := util.VddkInfo{ - Version: vddkVersion, - Host: vddkHost, - } - stopmsg, _ := json.Marshal(stopinfo) - err := util.WriteTerminationMessage(existing + "VDDK: " + string(stopmsg)) - if err != nil { - klog.Errorf("Unable to write termination message: %v", err) - } - } vs.NbdKit.Handle.Close() return vs.NbdKit.n.KillNbdkit() } @@ -966,6 +949,16 @@ func (vs *VDDKDataSource) GetURL() *url.URL { return vs.NbdKit.Socket } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (vs *VDDKDataSource) GetTerminationMessage() *common.TerminationMessage { + return &common.TerminationMessage{ + VddkInfo: &common.VddkInfo{ + Version: vddkVersion, + Host: vddkHost, + }, + } +} + // Transfer is called to transfer the data from the source to the path passed in. func (vs *VDDKDataSource) Transfer(path string) (ProcessingPhase, error) { return ProcessingPhaseTransferDataFile, nil diff --git a/pkg/importer/vddk-datasource_arm64.go b/pkg/importer/vddk-datasource_arm64.go index ae7639b65e..d5ea7a8bf2 100644 --- a/pkg/importer/vddk-datasource_arm64.go +++ b/pkg/importer/vddk-datasource_arm64.go @@ -30,6 +30,10 @@ func (V VDDKDataSource) GetURL() *url.URL { panic("not support") } +func (V VDDKDataSource) GetTerminationMessage() *common.TerminationMessage { + panic("not support") +} + func (V VDDKDataSource) Close() error { panic("not support") } diff --git a/pkg/importer/vddk-datasource_test.go b/pkg/importer/vddk-datasource_test.go index 2e11cd55eb..9b07b0ac3e 100644 --- a/pkg/importer/vddk-datasource_test.go +++ b/pkg/importer/vddk-datasource_test.go @@ -21,6 +21,7 @@ import ( "github.com/vmware/govmomi/vim25/types" v1 "k8s.io/api/core/v1" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/image" libnbd "libguestfs.org/libnbd" ) @@ -432,6 +433,18 @@ var _ = Describe("VDDK data source", func() { Expect(err).ToNot(HaveOccurred()) Expect(MaxPreadLength).To(Equal(uint32(MaxPreadLengthVC))) }) + + It("GetTerminationMessage should contain VDDK connection information", func() { + const testVersion = "testVersion" + const testHost = "testHost" + + source, err := NewVDDKDataSource("http://esx.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", "testdisk.vmdk", "", "", "", v1.PersistentVolumeFilesystem) + Expect(err).ToNot(HaveOccurred()) + + vddkVersion = testVersion + vddkHost = testHost + Expect(*source.GetTerminationMessage()).To(Equal(common.TerminationMessage{VddkInfo: &common.VddkInfo{Version: testVersion, Host: testHost}})) + }) }) var _ = Describe("VDDK log watcher", func() { diff --git a/pkg/uploadserver/uploadserver_test.go b/pkg/uploadserver/uploadserver_test.go index 49ad0f3b5f..01d21733e3 100644 --- a/pkg/uploadserver/uploadserver_test.go +++ b/pkg/uploadserver/uploadserver_test.go @@ -147,6 +147,11 @@ func (amd *AsyncMockDataSource) GetURL() *url.URL { return nil } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (amd *AsyncMockDataSource) GetTerminationMessage() *common.TerminationMessage { + return nil +} + // GetResumePhase returns the next phase to process when resuming func (amd *AsyncMockDataSource) GetResumePhase() importer.ProcessingPhase { return importer.ProcessingPhaseComplete diff --git a/pkg/util/util.go b/pkg/util/util.go index 904c7719af..be7a7d63c9 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -42,12 +42,6 @@ type CountingReader struct { Done bool } -// VddkInfo holds VDDK version and connection information returned by an importer pod -type VddkInfo struct { - Version string - Host string -} - // RandAlphaNum provides an implementation to generate a random alpha numeric string of the specified length func RandAlphaNum(n int) string { r := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/tests/datavolume_test.go b/tests/datavolume_test.go index 22e988d6b3..befc325db0 100644 --- a/tests/datavolume_test.go +++ b/tests/datavolume_test.go @@ -1013,7 +1013,7 @@ var _ = Describe("[vendor:cnv-qe@redhat.com][level:component]DataVolume tests", runningCondition: &cdiv1.DataVolumeCondition{ Type: cdiv1.DataVolumeRunning, Status: v1.ConditionFalse, - Message: "Import Complete; VDDK: {\"Version\":\"1.2.3\",\"Host\":\"esx.test\"}", + Message: "Import Complete", Reason: "Completed", }}), PEntry("[quarantine][test_id:5078]succeed creating warm import dv from VDDK source", dataVolumeTestArguments{ @@ -1037,7 +1037,7 @@ var _ = Describe("[vendor:cnv-qe@redhat.com][level:component]DataVolume tests", runningCondition: &cdiv1.DataVolumeCondition{ Type: cdiv1.DataVolumeRunning, Status: v1.ConditionFalse, - Message: "Import Complete; VDDK: {\"Version\":\"1.2.3\",\"Host\":\"esx.test\"}", + Message: "Import Complete", Reason: "Completed", }}), Entry("[rfe_id:XXXX][crit:high][test_id:XXXX]succeed creating import dv from GCS URL using RAW image", dataVolumeTestArguments{ @@ -1169,7 +1169,7 @@ var _ = Describe("[vendor:cnv-qe@redhat.com][level:component]DataVolume tests", runningCondition: &cdiv1.DataVolumeCondition{ Type: cdiv1.DataVolumeRunning, Status: v1.ConditionFalse, - Message: "Import Complete; VDDK: {\"Version\":\"1.2.3\",\"Host\":\"esx.test\"}", + Message: "Import Complete", Reason: "Completed", }}), )