Skip to content

Commit

Permalink
feat: Make importer datasource communication explicit (kubevirt#3101)
Browse files Browse the repository at this point in the history
Make the communication of datasources in the importer explicit by adding
a GetTerminationMessage method to the DataSourceInterface.
Then use this method to communicate additional information to the import
controller once the importer pod has terminated, instead of writing
additional data to the termination message in the Close method of
datasources.

Signed-off-by: Felix Matouschek <[email protected]>
  • Loading branch information
0xFelix authored Mar 13, 2024
1 parent 7fbe1c3 commit cd7ee9e
Show file tree
Hide file tree
Showing 26 changed files with 329 additions and 149 deletions.
2 changes: 1 addition & 1 deletion cmd/cdi-importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ go_library(
"//pkg/util:go_default_library",
"//pkg/util/prometheus:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/ptr:go_default_library",
],
)

Expand Down
61 changes: 28 additions & 33 deletions cmd/cdi-importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ package main
// ImporterSecretKey Optional. Secret key is the password to your account.

import (
"errors"
"flag"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/pkg/errors"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
Expand Down Expand Up @@ -149,20 +149,17 @@ func main() {
}

func handleEmptyImage(contentType string, imageSize string, availableDestSpace int64, preallocation bool, volumeMode v1.PersistentVolumeMode, filesystemOverhead float64) error {
var preallocationApplied bool

if contentType == string(cdiv1.DataVolumeKubeVirt) {
if volumeMode == v1.PersistentVolumeBlock && !preallocation {
klog.V(1).Infoln("Blank block without preallocation is exactly an empty PVC, done populating")
return nil
}
createBlankImage(imageSize, availableDestSpace, preallocation, volumeMode, filesystemOverhead)
preallocationApplied = preallocation
} else {
errorEmptyDiskWithContentTypeArchive()
}

err := importCompleteTerminationMessage(preallocationApplied)
err := writeTerminationMessage(&common.TerminationMessage{PreallocationApplied: ptr.To(preallocation)})
return err
}

Expand All @@ -181,49 +178,47 @@ func handleImport(
processor := newDataProcessor(contentType, volumeMode, ds, imageSize, filesystemOverhead, preallocation)
err := processor.ProcessData()

if err != nil {
scratchSpaceRequired := errors.Is(err, importer.ErrRequiresScratchSpace)
if err != nil && !scratchSpaceRequired {
klog.Errorf("%+v", err)
if err == importer.ErrRequiresScratchSpace {
if err := util.WriteTerminationMessage(common.ScratchSpaceRequired); err != nil {
klog.Errorf("%+v", err)
}
// Exiting instead of returning 0 as normally to avoid clashing
// with cleanup functions (fsyncDataFile) that assume the imported
// file will be there during regular exit.
os.Exit(0)
}
err = util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error()))
if err != nil {
if err := util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error())); err != nil {
klog.Errorf("%+v", err)
}

return 1
}

termMsg := ds.GetTerminationMessage()
if termMsg == nil {
termMsg = &common.TerminationMessage{}
}
termMsg.ScratchSpaceRequired = &scratchSpaceRequired
termMsg.PreallocationApplied = ptr.To(processor.PreallocationApplied())

touchDoneFile()
// due to the way some data sources can add additional information to termination message
// after finished (ds.close() ) termination message has to be written first, before the
// the ds is closed
// TODO: think about making communication explicit, probably DS interface should be extended
err = importCompleteTerminationMessage(processor.PreallocationApplied())
if err != nil {
if err := writeTerminationMessage(termMsg); err != nil {
klog.Errorf("%+v", err)
return 1
}

if scratchSpaceRequired {
// Exiting instead of returning 0 as normally to avoid clashing
// with cleanup functions (fsyncDataFile) that assume the imported
// file will be there during regular exit.
os.Exit(0)
}

return 0
}

func importCompleteTerminationMessage(preallocationApplied bool) error {
message := "Import Complete"
if preallocationApplied {
message += ", " + common.PreallocationApplied
}
err := util.WriteTerminationMessage(message)
func writeTerminationMessage(termMsg *common.TerminationMessage) error {
msg, err := termMsg.String()
if err != nil {
return err
}

klog.V(1).Infoln(message)
if err := util.WriteTerminationMessage(msg); err != nil {
return err
}
klog.V(1).Infoln(msg)
return nil
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/common/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -7,3 +7,17 @@ go_library(
visibility = ["//visibility:public"],
deps = ["//vendor/k8s.io/api/core/v1:go_default_library"],
)

go_test(
name = "go_default_test",
srcs = [
"common_suite_test.go",
"common_test.go",
],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/onsi/ginkgo/v2:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/k8s.io/utils/ptr:go_default_library",
],
)
30 changes: 30 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package common

import (
"encoding/json"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -328,3 +330,31 @@ var AsyncUploadFormPaths = []string{
UploadFormAsync,
"/v1alpha1/upload-form-async",
}

// VddkInfo holds VDDK version and connection information returned by an importer pod
type VddkInfo struct {
Version string
Host string
}

// TerminationMessage contains data to be serialized and used as the termination message of the importer.
type TerminationMessage struct {
ScratchSpaceRequired *bool `json:"scratchSpaceRequired,omitempty"`
PreallocationApplied *bool `json:"preallocationApplied,omitempty"`
VddkInfo *VddkInfo `json:"vddkInfo,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}

func (it *TerminationMessage) String() (string, error) {
msg, err := json.Marshal(it)
if err != nil {
return "", err
}

// Messages longer than 4096 are truncated by kubelet
if length := len(msg); length > 4096 {
return "", fmt.Errorf("Termination message length %d exceeds maximum length of 4096 bytes", length)
}

return string(msg), nil
}
13 changes: 13 additions & 0 deletions pkg/common/common_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestCommon(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Common Suite")
}
43 changes: 43 additions & 0 deletions pkg/common/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package common

import (
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/utils/ptr"
)

var _ = Describe("TerminationMessage", func() {
It("Should successfully serialize a TerminationMessage", func() {
termMsg := TerminationMessage{
VddkInfo: &VddkInfo{
Version: "testversion",
Host: "testhost",
},
Labels: map[string]string{
"testlabel": "testvalue",
},
PreallocationApplied: ptr.To(true),
}

serialized, err := termMsg.String()
Expect(err).ToNot(HaveOccurred())
Expect(serialized).To(Equal(`{"preallocationApplied":true,"vddkInfo":{"Version":"testversion","Host":"testhost"},"labels":{"testlabel":"testvalue"}}`))
})

It("Should fail if serialized data is longer than 4096 bytes", func() {
const length = 5000
const serializationOffset = 19

termMsg := TerminationMessage{
Labels: map[string]string{},
}
for i := 0; i < length-serializationOffset; i++ {
termMsg.Labels["t"] += "c"
}

_, err := termMsg.String()
Expect(err).To(MatchError(fmt.Sprintf("Termination message length %d exceeds maximum length of 4096 bytes", length)))
})
})
2 changes: 1 addition & 1 deletion pkg/controller/clone-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (r *CloneReconciler) updatePvcFromPod(sourcePod *corev1.Pod, pvc *corev1.Pe
r.recorder.Event(pvc, corev1.EventTypeNormal, CloneSucceededPVC, cc.CloneComplete)
}

setAnnotationsFromPodWithPrefix(pvc.Annotations, sourcePod, cc.AnnSourceRunningCondition)
setAnnotationsFromPodWithPrefix(pvc.Annotations, sourcePod, nil, cc.AnnSourceRunningCondition)

if !reflect.DeepEqual(currentPvcCopy, pvc) {
return r.updatePVC(pvc)
Expand Down
34 changes: 19 additions & 15 deletions pkg/controller/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,28 +366,32 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p

log.V(1).Info("Updating PVC from pod")
anno := pvc.GetAnnotations()
setAnnotationsFromPodWithPrefix(anno, pod, cc.AnnRunningCondition)

scratchExitCode := false
termMsg, err := parseTerminationMessage(pod)
if err != nil {
log.V(3).Info("Ignoring failure to parse termination message", "error", err.Error())
}
setAnnotationsFromPodWithPrefix(anno, pod, termMsg, cc.AnnRunningCondition)

scratchSpaceRequired := termMsg != nil && termMsg.ScratchSpaceRequired != nil && *termMsg.ScratchSpaceRequired
if scratchSpaceRequired {
log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name)
}

if pod.Status.ContainerStatuses != nil &&
pod.Status.ContainerStatuses[0].State.Terminated != nil {
if message := pod.Status.ContainerStatuses[0].State.Terminated.Message; strings.Contains(message, common.ScratchSpaceRequired) {
log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name)
scratchExitCode = true
anno[cc.AnnRequiresScratch] = "true"
} else if pod.Status.ContainerStatuses[0].State.Terminated.ExitCode > 0 {
log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].State.Terminated.ExitCode)
r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].State.Terminated.Message)
}
pod.Status.ContainerStatuses[0].State.Terminated != nil &&
pod.Status.ContainerStatuses[0].State.Terminated.ExitCode > 0 {
log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].State.Terminated.ExitCode)
r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].State.Terminated.Message)
}

if anno[cc.AnnCurrentCheckpoint] != "" {
anno[cc.AnnCurrentPodID] = string(pod.ObjectMeta.UID)
}

anno[cc.AnnImportPod] = string(pod.Name)
if !scratchExitCode {
// No scratch exit code, update the phase based on the pod. If we do have scratch exit code we don't want to update the
if !scratchSpaceRequired {
// No scratch space required, update the phase based on the pod. If we require scratch space we don't want to update the
// phase, because the pod might terminate cleanly and mistakenly mark the import complete.
anno[cc.AnnPodPhase] = string(pod.Status.Phase)
}
Expand Down Expand Up @@ -420,8 +424,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p
log.V(1).Info("Updated PVC", "pvc.anno.Phase", anno[cc.AnnPodPhase], "pvc.anno.Restarts", anno[cc.AnnPodRestarts])
}

if cc.IsPVCComplete(pvc) || scratchExitCode {
if !scratchExitCode {
if cc.IsPVCComplete(pvc) || scratchSpaceRequired {
if !scratchSpaceRequired {
r.recorder.Event(pvc, corev1.EventTypeNormal, ImportSucceededPVC, "Import Successful")
log.V(1).Info("Import completed successfully")
}
Expand Down
42 changes: 35 additions & 7 deletions pkg/controller/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,37 @@ var _ = Describe("Update PVC from POD", func() {
Expect(resPvc.GetAnnotations()[cc.AnnRunningConditionReason]).To(Equal("Reason"))
})

DescribeTable("Should handle termination messages", func(termMsg, conditionMessage string) {
pvc := cc.CreatePvc("testPvc1", "default", map[string]string{}, nil)
pod := cc.CreateImporterTestPod(pvc, "testPvc1", nil)
pod.Status = corev1.PodStatus{
Phase: corev1.PodSucceeded,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Message: termMsg,
},
},
},
},
}
reconciler = createImportReconciler(pvc, pod)
err := reconciler.updatePvcFromPod(pvc, pod, reconciler.log)
Expect(err).ToNot(HaveOccurred())

resPvc := &corev1.PersistentVolumeClaim{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "testPvc1", Namespace: "default"}, resPvc)
Expect(err).ToNot(HaveOccurred())

Expect(resPvc.GetAnnotations()).To(HaveKeyWithValue(cc.AnnPodPhase, string(corev1.PodSucceeded)))
Expect(resPvc.GetAnnotations()).To(HaveKeyWithValue(cc.AnnRunningCondition, "false"))
Expect(resPvc.GetAnnotations()).To(HaveKeyWithValue(cc.AnnRunningConditionMessage, conditionMessage))
},
Entry("Message which can be unmarshalled", `{"preAllocationApplied": true}`, ImportCompleteMessage),
Entry("Message which cannot be unmarshalled", "somemessage", "somemessage"),
)

It("Should update the PVC status to running, if pod is running", func() {
pvc := cc.CreatePvc("testPvc1", "default", map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodPending)}, nil)
pod := cc.CreateImporterTestPod(pvc, "testPvc1", nil)
Expand Down Expand Up @@ -531,7 +562,7 @@ var _ = Describe("Update PVC from POD", func() {
Expect(resPvc.GetAnnotations()[cc.AnnRunningConditionReason]).To(Equal("Explosion"))
})

It("Should NOT update phase on PVC, if pod exited with scratchspace required msg", func() {
It("Should NOT update phase on PVC, if pod exited with termination message stating scratch space is required", func() {
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound)
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
Expand All @@ -543,7 +574,7 @@ var _ = Describe("Update PVC from POD", func() {
State: v1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 0,
Message: common.ScratchSpaceRequired,
Message: `{"scratchSpaceRequired": true}`,
},
},
},
Expand Down Expand Up @@ -640,7 +671,7 @@ var _ = Describe("Update PVC from POD", func() {
State: v1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 0,
Message: `Import Complete; VDDK: {"Version": "1.0.0", "Host": "esx15.test.lan"}`,
Message: `{"vddkInfo": {"Version": "1.0.0", "Host": "esx15.test.lan"}}`,
Reason: "Completed",
},
},
Expand Down Expand Up @@ -672,10 +703,7 @@ var _ = Describe("Update PVC from POD", func() {
ContainerStatuses: []corev1.ContainerStatus{
{
LastTerminationState: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 0,
Message: common.ScratchSpaceRequired,
},
Terminated: &corev1.ContainerStateTerminated{},
},
},
},
Expand Down
Loading

0 comments on commit cd7ee9e

Please sign in to comment.