Skip to content

Commit

Permalink
[local] Support topolvm/openebs storage for scaling decisions
Browse files Browse the repository at this point in the history
  • Loading branch information
dhenkel92 authored and Fricounet committed Jun 6, 2024
1 parent 71472ee commit d943d0a
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 12 deletions.
49 changes: 44 additions & 5 deletions cluster-autoscaler/processors/datadog/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
Expand All @@ -31,20 +32,37 @@ const (
// nodes offering local storage, and currently injected as requests on
// Pending pods having a PVC for local-data volumes.
DatadogLocalDataResource apiv1.ResourceName = "storageclass/local-data"

// DatadogLocalStorageProvisionerLabel is indicating which technology will be used to provide local storage
DatadogLocalStorageProvisionerLabel = "nodegroups.datadoghq.com/local-storage-provisioner"
// DatadogInitialStorageCapacityLabel is storing the amount of local storage a new node will have in the beginning
// e.g. nodegroups.datadoghq.com/initial-storage-capacity=100Gi
DatadogInitialStorageCapacityLabel = "nodegroups.datadoghq.com/initial-storage-capacity"

// DatadogStorageProvisionerTopoLVM is the storage provisioner label value to use for topolvm implementation
DatadogStorageProvisionerTopoLVM = "topolvm"
// DatadogStorageProvisionerOpenEBS is the storage provisioner label value to use for openebs implementation
DatadogStorageProvisionerOpenEBS = "openebs-lvm"
)

var (
// DatadogLocalDataQuantity is the default amount of DatadogLocalDataResource
DatadogLocalDataQuantity = resource.NewQuantity(1, resource.DecimalSI)
)

// NodeHasLocalData returns true if the node holds a local-storage:true label
// NodeHasLocalData returns true if the node holds a local-storage:true or local-storage-provisioner:<any> label
func NodeHasLocalData(node *apiv1.Node) bool {
if node == nil {
return false
}
value, ok := node.GetLabels()[DatadogLocalStorageLabel]
return ok && value == "true"

labels := node.GetLabels()

_, newStorageOk := labels[DatadogLocalStorageProvisionerLabel]
value, ok := labels[DatadogLocalStorageLabel]

// the node should have either the local-stoarge or local-storage-provisioner label
return (ok && value == "true") || newStorageOk
}

// SetNodeLocalDataResource updates a NodeInfo with the DatadogLocalDataResource resource
Expand All @@ -61,7 +79,28 @@ func SetNodeLocalDataResource(nodeInfo *schedulerframework.NodeInfo) {
if node.Status.Capacity == nil {
node.Status.Capacity = apiv1.ResourceList{}
}
node.Status.Capacity[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy()
node.Status.Allocatable[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy()

provisioner, _ := node.Labels[DatadogLocalStorageProvisionerLabel]
switch provisioner {
case DatadogStorageProvisionerTopoLVM, DatadogStorageProvisionerOpenEBS:
capacity, _ := node.Labels[DatadogInitialStorageCapacityLabel]
capacityResource, err := resource.ParseQuantity(capacity)
if err == nil {
node.Status.Capacity[DatadogLocalDataResource] = capacityResource.DeepCopy()
node.Status.Allocatable[DatadogLocalDataResource] = capacityResource.DeepCopy()
} else {
klog.Warningf("failed to attach capacity information (%s) to node (%s): %v", capacity, node.Name, err)
}
default:
// The old local-storage provisioner is using a different label for identification.
// So if we cannot find any of the new options, we should check if it's using the old system and otherwise print a warning.
if val, ok := node.Labels[DatadogLocalStorageLabel]; ok && val == "true" {
node.Status.Capacity[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy()
node.Status.Allocatable[DatadogLocalDataResource] = DatadogLocalDataQuantity.DeepCopy()
} else {
klog.Warningf("this should never be reached. local storage provisioner (%s) is unknown and cannot be used on node: %s", provisioner, node.Name)
}
}

nodeInfo.SetNode(node)
}
82 changes: 81 additions & 1 deletion cluster-autoscaler/processors/datadog/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ func TestNodeHasLocalData(t *testing.T) {
nil,
false,
},
{
"local-storage-provisioner label was set",
&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{DatadogLocalStorageProvisionerLabel: "topolvm"},
},
},
true,
},
}

for _, tt := range tests {
Expand All @@ -87,7 +96,11 @@ func TestSetNodeLocalDataResource(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "egg"},
},
)
ni.SetNode(&corev1.Node{})
ni.SetNode(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{DatadogLocalStorageLabel: "true"},
},
})

SetNodeLocalDataResource(ni)

Expand All @@ -101,3 +114,70 @@ func TestSetNodeLocalDataResource(t *testing.T) {

assert.Equal(t, len(ni.Pods), 2)
}

func TestSetNodeResourceFromTopolvm(t *testing.T) {
var hundredGB int64 = 100 * 1024 * 1024 * 1024
ni := schedulerframework.NewNodeInfo()
ni.SetNode(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
DatadogLocalStorageProvisionerLabel: "topolvm",
DatadogInitialStorageCapacityLabel: "100Gi",
},
},
})

SetNodeLocalDataResource(ni)

nodeValue, ok := ni.Node().Status.Allocatable[DatadogLocalDataResource]
assert.True(t, ok)
assert.Equal(t, nodeValue.String(), resource.NewQuantity(hundredGB, resource.BinarySI).String())

niValue, ok := ni.Allocatable.ScalarResources[DatadogLocalDataResource]
assert.True(t, ok)
assert.Equal(t, niValue, hundredGB)
}

func TestShouldNotSetResourcesWithMissingLabel(t *testing.T) {
ni := schedulerframework.NewNodeInfo()
ni.SetNode(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
DatadogLocalStorageProvisionerLabel: "topolvm",
},
},
})

SetNodeLocalDataResource(ni)

_, ok := ni.Node().Status.Allocatable[DatadogLocalDataResource]
assert.False(t, ok)
_, ok = ni.Node().Status.Capacity[DatadogLocalDataResource]
assert.False(t, ok)

_, ok = ni.Allocatable.ScalarResources[DatadogLocalDataResource]
assert.False(t, ok)
}

func TestSetNodeResourceFromOpenEBS(t *testing.T) {
var hundredGB int64 = 100 * 1024 * 1024 * 1024
ni := schedulerframework.NewNodeInfo()
ni.SetNode(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
DatadogLocalStorageProvisionerLabel: "openebs-lvm",
DatadogInitialStorageCapacityLabel: "100Gi",
},
},
})

SetNodeLocalDataResource(ni)

nodeValue, ok := ni.Node().Status.Allocatable[DatadogLocalDataResource]
assert.True(t, ok)
assert.Equal(t, nodeValue.String(), resource.NewQuantity(hundredGB, resource.BinarySI).String())

niValue, ok := ni.Allocatable.ScalarResources[DatadogLocalDataResource]
assert.True(t, ok)
assert.Equal(t, niValue, hundredGB)
}
42 changes: 39 additions & 3 deletions cluster-autoscaler/processors/datadog/pods/transform_local_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ import (
klog "k8s.io/klog/v2"
)

const (
storageClassNameLocal = "local-data"
storageClassNameTopolvm = "topolvm-provisioner"
storageClassNameOpenEBS = "openebs-lvmpv"
)

type transformLocalData struct {
pvcLister v1lister.PersistentVolumeClaimLister
stopChannel chan struct{}
Expand Down Expand Up @@ -102,7 +108,7 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap
volumes = append(volumes, vol)
continue
}
if *pvc.Spec.StorageClassName != "local-data" {
if !isSpecialPVCStorageClass(*pvc.Spec.StorageClassName) {
volumes = append(volumes, vol)
continue
}
Expand All @@ -113,16 +119,46 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap
if len(po.Spec.Containers[0].Resources.Limits) == 0 {
po.Spec.Containers[0].Resources.Limits = apiv1.ResourceList{}
}
if len(pvc.Spec.Resources.Requests) == 0 {
pvc.Spec.Resources.Requests = apiv1.ResourceList{}
}

po.Spec.Containers[0].Resources.Requests[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy()
po.Spec.Containers[0].Resources.Limits[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy()
switch *pvc.Spec.StorageClassName {
case storageClassNameTopolvm, storageClassNameOpenEBS:
if storage, ok := pvc.Spec.Resources.Requests["storage"]; ok {
po.Spec.Containers[0].Resources.Requests[common.DatadogLocalDataResource] = storage.DeepCopy()
po.Spec.Containers[0].Resources.Limits[common.DatadogLocalDataResource] = storage.DeepCopy()
} else {
klog.Warningf("ignoring pvc as it does not have storage request information")
volumes = append(volumes, vol)
}
case storageClassNameLocal:
po.Spec.Containers[0].Resources.Requests[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy()
po.Spec.Containers[0].Resources.Limits[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy()
default:
klog.Warningf("this should never be reached. pvc storage class (%s) cannot be used for scaling on pod: %s", *pvc.Spec.StorageClassName, po.Name)
volumes = append(volumes, vol)
}
}
po.Spec.Volumes = volumes
}

return pods, nil
}

func isSpecialPVCStorageClass(className string) bool {
switch className {
case storageClassNameOpenEBS:
return true
case storageClassNameTopolvm:
return true
case storageClassNameLocal:
return true
default:
return false
}
}

// NewPersistentVolumeClaimLister builds a persistentvolumeclaim lister.
func NewPersistentVolumeClaimLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.PersistentVolumeClaimLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "persistentvolumeclaims", apiv1.NamespaceAll, fields.Everything())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/datadog/common"
Expand All @@ -32,7 +34,6 @@ import (

var (
testRemoteClass = "remote-data"
testLocalClass = "local-data"
testNamespace = "foons"
testEmptyResources = corev1.ResourceList{}
testLdResources = corev1.ResourceList{
Expand All @@ -41,6 +42,11 @@ var (
)

func TestTransformLocalDataProcess(t *testing.T) {
test100GResource, _ := resource.ParseQuantity("100Gi")
testTopolvmResources := corev1.ResourceList{
common.DatadogLocalDataResource: test100GResource,
}

tests := []struct {
name string
pods []*corev1.Pod
Expand All @@ -64,7 +70,7 @@ func TestTransformLocalDataProcess(t *testing.T) {
{
"local-data volumes are removed, and custom resources added",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1")},
[]*corev1.PersistentVolumeClaim{buildPVC("pvc-1", testLocalClass)},
[]*corev1.PersistentVolumeClaim{buildPVC("pvc-1", storageClassNameLocal)},
[]*corev1.Pod{buildPod("pod1", testLdResources, testLdResources)},
},

Expand All @@ -73,7 +79,7 @@ func TestTransformLocalDataProcess(t *testing.T) {
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "pvc-2", "pvc-3")},
[]*corev1.PersistentVolumeClaim{
buildPVC("pvc-1", testRemoteClass),
buildPVC("pvc-2", testLocalClass),
buildPVC("pvc-2", storageClassNameLocal),
buildPVC("pvc-3", testRemoteClass),
},
[]*corev1.Pod{buildPod("pod1", testLdResources, testLdResources, "pvc-1", "pvc-3")},
Expand All @@ -92,6 +98,36 @@ func TestTransformLocalDataProcess(t *testing.T) {
[]*corev1.PersistentVolumeClaim{},
[]*corev1.Pod{},
},

{
"topolvm provisioner is using proper storage capacity value",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "pvc-2")},
[]*corev1.PersistentVolumeClaim{
buildPVC("pvc-1", testRemoteClass),
buildPVCWithStorage("pvc-2", storageClassNameTopolvm, "100Gi"),
},
[]*corev1.Pod{buildPod("pod1", testTopolvmResources, testTopolvmResources, "pvc-1")},
},

{
"one pvc will override the other",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "pvc-2")},
[]*corev1.PersistentVolumeClaim{
buildPVCWithStorage("pvc-1", storageClassNameTopolvm, "100Gi"),
buildPVC("pvc-2", storageClassNameLocal),
},
[]*corev1.Pod{buildPod("pod1", testLdResources, testLdResources)},
},

{
"openebs provisioner is using proper storage capacity value",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "pvc-2")},
[]*corev1.PersistentVolumeClaim{
buildPVC("pvc-1", testRemoteClass),
buildPVCWithStorage("pvc-2", storageClassNameOpenEBS, "100Gi"),
},
[]*corev1.Pod{buildPod("pod1", testTopolvmResources, testTopolvmResources, "pvc-1")},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -166,3 +202,11 @@ func buildPVC(name string, storageClassName string) *corev1.PersistentVolumeClai
},
}
}

func buildPVCWithStorage(name, storageClassName, storageQuantity string) *corev1.PersistentVolumeClaim {
pvc := buildPVC(name, storageClassName)
quantity, _ := resource.ParseQuantity(storageQuantity)
pvc.Spec.Resources.Requests = apiv1.ResourceList{}
pvc.Spec.Resources.Requests["storage"] = quantity
return pvc
}

0 comments on commit d943d0a

Please sign in to comment.