diff --git a/cluster-autoscaler/processors/datadog/common/common.go b/cluster-autoscaler/processors/datadog/common/common.go index cf15d10926dd..38d93da268c3 100644 --- a/cluster-autoscaler/processors/datadog/common/common.go +++ b/cluster-autoscaler/processors/datadog/common/common.go @@ -19,6 +19,7 @@ package common import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -31,6 +32,19 @@ const ( // nodes offering local storage, and currently injected as requests on // Pending pods having a PVC for local-data volumes. DatadogLocalDataResource apiv1.ResourceName = "storageclass/local-data" + // DatadogEphemeralLocalDataResource is a virtual resource placed on new or future + // nodes offering ephemeral local storage, and currently injected as requests on + // Pending pods having an ephemeral PVC for local-data volumes. + DatadogEphemeralLocalDataResource apiv1.ResourceName = "storageclass/ephemeral-local-data" + + // DatadogLocalStorageProvisionerLabel is indicating which technology will be used to provide local storage + DatadogLocalStorageProvisionerLabel = "nodegroups.datadoghq.com/local-storage-provisioner" + // DatadogInitialStorageCapacityLabel is storing the amount of local storage a new node will have in the beginning + // e.g. nodegroups.datadoghq.com/initial-storage-capacity=100Gi + DatadogInitialStorageCapacityLabel = "nodegroups.datadoghq.com/initial-storage-capacity" + + // DatadogStorageProvisionerTopoLVM is the storage provisioner label value to use for topolvm implementation + DatadogStorageProvisionerTopoLVM = "topolvm" ) var ( @@ -38,13 +52,19 @@ var ( DatadogLocalDataQuantity = resource.NewQuantity(1, resource.DecimalSI) ) -// NodeHasLocalData returns true if the node holds a local-storage:true label +// NodeHasLocalData returns true if the node holds a local-storage:true or local-storage-provisioner: label func NodeHasLocalData(node *apiv1.Node) bool { if node == nil { return false } - value, ok := node.GetLabels()[DatadogLocalStorageLabel] - return ok && value == "true" + + labels := node.GetLabels() + + _, newStorageOk := labels[DatadogLocalStorageProvisionerLabel] + value, ok := labels[DatadogLocalStorageLabel] + + // the node should have either the local-stoarge or local-storage-provisioner label + return (ok && value == "true") || newStorageOk } // SetNodeLocalDataResource updates a NodeInfo with the DatadogLocalDataResource resource @@ -61,7 +81,28 @@ func SetNodeLocalDataResource(nodeInfo *schedulerframework.NodeInfo) { if node.Status.Capacity == nil { node.Status.Capacity = apiv1.ResourceList{} } - node.Status.Capacity[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy() - node.Status.Allocatable[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy() + + provisioner := node.Labels[DatadogLocalStorageProvisionerLabel] + switch provisioner { + case DatadogStorageProvisionerTopoLVM: + capacity := node.Labels[DatadogInitialStorageCapacityLabel] + capacityResource, err := resource.ParseQuantity(capacity) + if err == nil { + node.Status.Capacity[DatadogEphemeralLocalDataResource] = capacityResource.DeepCopy() + node.Status.Allocatable[DatadogEphemeralLocalDataResource] = capacityResource.DeepCopy() + } else { + klog.Warningf("failed to attach capacity information (%s) to node (%s): %v", capacity, node.Name, err) + } + default: + // The old local-storage provisioner is using a different label for identification. + // So if we cannot find any of the new options, we should check if it's using the old system and otherwise print a warning. + if val, ok := node.Labels[DatadogLocalStorageLabel]; ok && val == "true" { + node.Status.Capacity[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy() + node.Status.Allocatable[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy() + } else { + klog.Warningf("this should never be reached. local storage provisioner (%s) is unknown and cannot be used on node: %s", provisioner, node.Name) + } + } + nodeInfo.SetNode(node) } diff --git a/cluster-autoscaler/processors/datadog/common/common_test.go b/cluster-autoscaler/processors/datadog/common/common_test.go index 7d63271bb4aa..1fa5e3f3d63d 100644 --- a/cluster-autoscaler/processors/datadog/common/common_test.go +++ b/cluster-autoscaler/processors/datadog/common/common_test.go @@ -69,6 +69,15 @@ func TestNodeHasLocalData(t *testing.T) { nil, false, }, + { + "local-storage-provisioner label was set", + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{DatadogLocalStorageProvisionerLabel: "topolvm"}, + }, + }, + true, + }, } for _, tt := range tests { @@ -87,7 +96,11 @@ func TestSetNodeLocalDataResource(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "egg"}, }, ) - ni.SetNode(&corev1.Node{}) + ni.SetNode(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{DatadogLocalStorageLabel: "true"}, + }, + }) SetNodeLocalDataResource(ni) @@ -99,5 +112,71 @@ func TestSetNodeLocalDataResource(t *testing.T) { assert.True(t, ok) assert.Equal(t, niValue, int64(1)) + // Only DatadogLocalDataResource should be set + _, ok = ni.Node().Status.Allocatable[DatadogEphemeralLocalDataResource] + assert.False(t, ok) + + _, ok = ni.Allocatable.ScalarResources[DatadogEphemeralLocalDataResource] + assert.False(t, ok) + assert.Equal(t, len(ni.Pods), 2) } + +func TestSetNodeResourceFromTopolvm(t *testing.T) { + var hundredGB int64 = 100 * 1024 * 1024 * 1024 + ni := schedulerframework.NewNodeInfo() + ni.SetNode(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + DatadogLocalStorageProvisionerLabel: "topolvm", + DatadogInitialStorageCapacityLabel: "100Gi", + }, + }, + }) + + SetNodeLocalDataResource(ni) + + nodeValue, ok := ni.Node().Status.Allocatable[DatadogEphemeralLocalDataResource] + assert.True(t, ok) + assert.Equal(t, nodeValue.String(), resource.NewQuantity(hundredGB, resource.BinarySI).String()) + + niValue, ok := ni.Allocatable.ScalarResources[DatadogEphemeralLocalDataResource] + assert.True(t, ok) + assert.Equal(t, niValue, hundredGB) + + // Only DatadogEphemeralLocalDataResource should be set + _, ok = ni.Node().Status.Allocatable[DatadogLocalDataResource] + assert.False(t, ok) + + _, ok = ni.Allocatable.ScalarResources[DatadogLocalDataResource] + assert.False(t, ok) +} + +func TestShouldNotSetResourcesWithMissingLabel(t *testing.T) { + ni := schedulerframework.NewNodeInfo() + ni.SetNode(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + DatadogLocalStorageProvisionerLabel: "topolvm", + }, + }, + }) + + SetNodeLocalDataResource(ni) + + _, ok := ni.Node().Status.Allocatable[DatadogLocalDataResource] + assert.False(t, ok) + _, ok = ni.Node().Status.Capacity[DatadogLocalDataResource] + assert.False(t, ok) + + _, ok = ni.Allocatable.ScalarResources[DatadogLocalDataResource] + assert.False(t, ok) + + _, ok = ni.Node().Status.Allocatable[DatadogEphemeralLocalDataResource] + assert.False(t, ok) + _, ok = ni.Node().Status.Capacity[DatadogEphemeralLocalDataResource] + assert.False(t, ok) + + _, ok = ni.Allocatable.ScalarResources[DatadogEphemeralLocalDataResource] + assert.False(t, ok) +} diff --git a/cluster-autoscaler/processors/datadog/pods/transform_local_data.go b/cluster-autoscaler/processors/datadog/pods/transform_local_data.go index 248c8f833ddb..2c88c3502b23 100644 --- a/cluster-autoscaler/processors/datadog/pods/transform_local_data.go +++ b/cluster-autoscaler/processors/datadog/pods/transform_local_data.go @@ -56,6 +56,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/datadog/common" apiv1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" client "k8s.io/client-go/kubernetes" @@ -64,6 +65,11 @@ import ( klog "k8s.io/klog/v2" ) +const ( + storageClassNameLocal = "local-data" + storageClassNameTopolvm = "ephemeral-local-data" +) + type transformLocalData struct { pvcLister v1lister.PersistentVolumeClaimLister stopChannel chan struct{} @@ -90,19 +96,25 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap for _, po := range pods { var volumes []apiv1.Volume for _, vol := range po.Spec.Volumes { - if vol.PersistentVolumeClaim == nil { - volumes = append(volumes, vol) - continue - } - pvc, err := p.pvcLister.PersistentVolumeClaims(po.Namespace).Get(vol.PersistentVolumeClaim.ClaimName) - if err != nil { - if !apierrors.IsNotFound(err) { - klog.Warningf("failed to fetch pvc for %s/%s: %v", po.GetNamespace(), po.GetName(), err) + var pvcSpec *apiv1.PersistentVolumeClaimSpec + if vol.PersistentVolumeClaim != nil { + pvc, err := p.pvcLister.PersistentVolumeClaims(po.Namespace).Get(vol.PersistentVolumeClaim.ClaimName) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.Warningf("failed to fetch pvc for %s/%s: %v", po.GetNamespace(), po.GetName(), err) + } + volumes = append(volumes, vol) + continue } + pvcSpec = &pvc.Spec + } else if vol.Ephemeral != nil { + pvcSpec = &vol.Ephemeral.VolumeClaimTemplate.Spec + } else { volumes = append(volumes, vol) continue } - if *pvc.Spec.StorageClassName != "local-data" { + + if !isSpecialPVCStorageClass(*pvcSpec.StorageClassName) { volumes = append(volumes, vol) continue } @@ -113,9 +125,47 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap if len(po.Spec.Containers[0].Resources.Limits) == 0 { po.Spec.Containers[0].Resources.Limits = apiv1.ResourceList{} } + if len(pvcSpec.Resources.Requests) == 0 { + pvcSpec.Resources.Requests = apiv1.ResourceList{} + } - po.Spec.Containers[0].Resources.Requests[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy() - po.Spec.Containers[0].Resources.Limits[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy() + switch *pvcSpec.StorageClassName { + case storageClassNameTopolvm: + // Only support ephemeral volumes for topolvm + if vol.Ephemeral == nil { + volumes = append(volumes, vol) + continue + } + if storage, ok := pvcSpec.Resources.Requests[corev1.ResourceStorage]; ok { + if request, ok := po.Spec.Containers[0].Resources.Requests[common.DatadogEphemeralLocalDataResource]; ok { + request.Add(storage) + po.Spec.Containers[0].Resources.Requests[common.DatadogEphemeralLocalDataResource] = request + } else { + po.Spec.Containers[0].Resources.Requests[common.DatadogEphemeralLocalDataResource] = storage.DeepCopy() + } + + if limit, ok := po.Spec.Containers[0].Resources.Limits[common.DatadogEphemeralLocalDataResource]; ok { + limit.Add(storage) + po.Spec.Containers[0].Resources.Limits[common.DatadogEphemeralLocalDataResource] = limit + } else { + po.Spec.Containers[0].Resources.Limits[common.DatadogEphemeralLocalDataResource] = storage.DeepCopy() + } + } else { + klog.Warningf("ignoring pvc as it does not have storage request information") + volumes = append(volumes, vol) + } + case storageClassNameLocal: + // Only support persistent volumes for local storage + if vol.PersistentVolumeClaim == nil { + volumes = append(volumes, vol) + continue + } + po.Spec.Containers[0].Resources.Requests[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy() + po.Spec.Containers[0].Resources.Limits[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy() + default: + klog.Warningf("this should never be reached. pvc storage class (%s) cannot be used for scaling on pod: %s", *pvcSpec.StorageClassName, po.Name) + volumes = append(volumes, vol) + } } po.Spec.Volumes = volumes } @@ -123,6 +173,17 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap return pods, nil } +func isSpecialPVCStorageClass(className string) bool { + switch className { + case storageClassNameTopolvm: + return true + case storageClassNameLocal: + return true + default: + return false + } +} + // NewPersistentVolumeClaimLister builds a persistentvolumeclaim lister. func NewPersistentVolumeClaimLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.PersistentVolumeClaimLister { listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "persistentvolumeclaims", apiv1.NamespaceAll, fields.Everything()) diff --git a/cluster-autoscaler/processors/datadog/pods/transform_local_data_test.go b/cluster-autoscaler/processors/datadog/pods/transform_local_data_test.go index fcede6354cdd..9c4cec020639 100644 --- a/cluster-autoscaler/processors/datadog/pods/transform_local_data_test.go +++ b/cluster-autoscaler/processors/datadog/pods/transform_local_data_test.go @@ -18,26 +18,37 @@ package pods import ( "fmt" + "strings" "testing" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/processors/datadog/common" v1lister "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/utils/pointer" ) var ( testRemoteClass = "remote-data" - testLocalClass = "local-data" testNamespace = "foons" testEmptyResources = corev1.ResourceList{} testLdResources = corev1.ResourceList{ common.DatadogLocalDataResource: common.DatadogLocalDataQuantity.DeepCopy(), } + test100GResource = resource.MustParse("100Gi") + testTopolvmResources = corev1.ResourceList{ + common.DatadogEphemeralLocalDataResource: test100GResource, + } + testMixedResources = corev1.ResourceList{ + common.DatadogLocalDataResource: common.DatadogLocalDataQuantity.DeepCopy(), + common.DatadogEphemeralLocalDataResource: test100GResource, + } ) func TestTransformLocalDataProcess(t *testing.T) { @@ -64,7 +75,7 @@ func TestTransformLocalDataProcess(t *testing.T) { { "local-data volumes are removed, and custom resources added", []*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1")}, - []*corev1.PersistentVolumeClaim{buildPVC("pvc-1", testLocalClass)}, + []*corev1.PersistentVolumeClaim{buildPVC("pvc-1", storageClassNameLocal)}, []*corev1.Pod{buildPod("pod1", testLdResources, testLdResources)}, }, @@ -73,7 +84,7 @@ func TestTransformLocalDataProcess(t *testing.T) { []*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "pvc-2", "pvc-3")}, []*corev1.PersistentVolumeClaim{ buildPVC("pvc-1", testRemoteClass), - buildPVC("pvc-2", testLocalClass), + buildPVC("pvc-2", storageClassNameLocal), buildPVC("pvc-3", testRemoteClass), }, []*corev1.Pod{buildPod("pod1", testLdResources, testLdResources, "pvc-1", "pvc-3")}, @@ -92,6 +103,32 @@ func TestTransformLocalDataProcess(t *testing.T) { []*corev1.PersistentVolumeClaim{}, []*corev1.Pod{}, }, + + { + "topolvm handles ephemeral volumes", + []*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "ephemeral-pvc-2")}, + []*corev1.PersistentVolumeClaim{ + buildPVC("pvc-1", testRemoteClass), + }, + []*corev1.Pod{buildPod("pod1", testTopolvmResources, testTopolvmResources, "pvc-1")}, + }, + + { + "ephemeral and persistent volumes are handled separately", + []*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "ephemeral-pvc-1", "pvc-2")}, + []*corev1.PersistentVolumeClaim{ + buildPVC("pvc-2", storageClassNameLocal), + }, + []*corev1.Pod{buildPod("pod1", testMixedResources, testMixedResources)}, + }, + { + "no persistent volume for topolvm", + []*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1")}, + []*corev1.PersistentVolumeClaim{ + buildPVC("pvc-1", storageClassNameTopolvm), + }, + []*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1")}, + }, } for _, tt := range tests { @@ -141,15 +178,36 @@ func buildPod(name string, requests, limits corev1.ResourceList, claimNames ...s } for _, name := range claimNames { - pod.Spec.Volumes = append(pod.Spec.Volumes, - corev1.Volume{ - Name: name, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: name, + if strings.Contains(name, "ephemeral") { + pod.Spec.Volumes = append(pod.Spec.Volumes, + corev1.Volume{ + Name: name, + VolumeSource: corev1.VolumeSource{ + Ephemeral: &corev1.EphemeralVolumeSource{ + VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{ + Spec: corev1.PersistentVolumeClaimSpec{ + StorageClassName: pointer.String(storageClassNameTopolvm), + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("100Gi"), + }, + }, + }, + }, + }, }, - }, - }) + }) + } else { + pod.Spec.Volumes = append(pod.Spec.Volumes, + corev1.Volume{ + Name: name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: name, + }, + }, + }) + } } return pod @@ -166,3 +224,11 @@ func buildPVC(name string, storageClassName string) *corev1.PersistentVolumeClai }, } } + +func buildPVCWithStorage(name, storageClassName, storageQuantity string) *corev1.PersistentVolumeClaim { + pvc := buildPVC(name, storageClassName) + quantity, _ := resource.ParseQuantity(storageQuantity) + pvc.Spec.Resources.Requests = apiv1.ResourceList{} + pvc.Spec.Resources.Requests["storage"] = quantity + return pvc +}