Skip to content

Commit

Permalink
Merge pull request #123 from DataDog/cherry-pick-expander-changes
Browse files Browse the repository at this point in the history
Cherry pick expander changes
  • Loading branch information
rrangith authored Dec 13, 2024
2 parents 6e35d9c + a354562 commit d565386
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 186 deletions.
1 change: 1 addition & 0 deletions cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ The following startup parameters are supported for cluster autoscaler:
| `balance-similar-node-groups` | Detect similar node groups and balance the number of nodes between them | false
| `balancing-ignore-label` | Define a node label that should be ignored when considering node group similarity. One label per flag occurrence. | ""
| `balancing-label` | Define a node label to use when comparing node group similarity. If set, all other comparison logic is disabled, and only labels are considered when comparing groups. One label per flag occurrence. | ""
| `skip-similar-node-group-recomputation` | Should CA skip similar NodeGroup recomputation for the best option returned by the expander during scaleups. You must enable `balance-similar-node-groups` for this to work. | false
| `node-autoprovisioning-enabled` | Should CA autoprovision node groups when needed | false
| `max-autoprovisioned-node-group-count` | The maximum number of autoprovisioned groups in the cluster | 15
| `unremovable-node-recheck-timeout` | The timeout before we check again a node that couldn't be removed before | 5 minutes
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type AutoscalingOptions struct {
GpuTotal []GpuLimits
// NodeGroupAutoDiscovery represents one or more definition(s) of node group auto-discovery
NodeGroupAutoDiscovery []string
// SkipSimilarNodeGroupRecomputation skips similar NodeGroup recomputation for the best option returned by the expander during scaleups
SkipSimilarNodeGroupRecomputation bool
// EstimatorName is the estimator used to estimate the number of needed nodes in scale up.
EstimatorName string
// ExpanderNames sets the chain of node group expanders to be used in scale up
Expand Down
13 changes: 8 additions & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}
}

scaleUpInfos, aErr := o.balanceScaleUps(now, bestOption.NodeGroup, newNodes, nodeInfos, schedulablePodGroups)
scaleUpInfos, aErr := o.balanceScaleUps(now, bestOption, newNodes, nodeInfos, schedulablePodGroups)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods},
Expand Down Expand Up @@ -680,13 +680,16 @@ func (o *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCou

func (o *ScaleUpOrchestrator) balanceScaleUps(
now time.Time,
nodeGroup cloudprovider.NodeGroup,
bestOption *expander.Option,
newNodes int,
nodeInfos map[string]*schedulerframework.NodeInfo,
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
) ([]nodegroupset.ScaleUpInfo, errors.AutoscalerError) {
// Recompute similar node groups in case they need to be updated
similarNodeGroups := o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePodGroups, now)
similarNodeGroups := bestOption.SimilarNodeGroups
if !o.autoscalingContext.SkipSimilarNodeGroupRecomputation {
// Recompute similar node groups in case they need to be updated
similarNodeGroups = o.ComputeSimilarNodeGroups(bestOption.NodeGroup, nodeInfos, schedulablePodGroups, now)
}
if similarNodeGroups != nil {
// if similar node groups are found, log about them
similarNodeGroupIds := make([]string, 0)
Expand All @@ -699,7 +702,7 @@ func (o *ScaleUpOrchestrator) balanceScaleUps(
klog.V(2).Info("No similar node groups found")
}

targetNodeGroups := []cloudprovider.NodeGroup{nodeGroup}
targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
for _, ng := range similarNodeGroups {
targetNodeGroups = append(targetNodeGroups, ng)
}
Expand Down
40 changes: 35 additions & 5 deletions cluster-autoscaler/expander/grpcplugin/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -111,11 +112,20 @@ func populateOptionsForGRPC(expansionOptions []expander.Option) ([]*protos.Optio
nodeGroupIDOptionMap := make(map[string]expander.Option)
for _, option := range expansionOptions {
nodeGroupIDOptionMap[option.NodeGroup.Id()] = option
grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods))
similarNodeGroupIds := getSimilarNodeGroupIds(option)
grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods, similarNodeGroupIds))
}
return grpcOptionsSlice, nodeGroupIDOptionMap
}

func getSimilarNodeGroupIds(option expander.Option) []string {
var similarNodeGroupIds []string
for _, sng := range option.SimilarNodeGroups {
similarNodeGroupIds = append(similarNodeGroupIds, sng.Id())
}
return similarNodeGroupIds
}

// populateNodeInfoForGRPC looks at the corresponding v1.Node object per NodeInfo object, and populates the grpcNodeInfoMap with these to pass over grpc
func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo) map[string]*v1.Node {
grpcNodeInfoMap := make(map[string]*v1.Node)
Expand All @@ -132,8 +142,9 @@ func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Op
klog.Error("GRPC server returned nil Option")
continue
}
if _, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok {
options = append(options, nodeGroupIDOptionMap[option.NodeGroupId])
if expanderOption, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok {
expanderOption.SimilarNodeGroups = getRetainedSimilarNodegroups(option, expanderOption)
options = append(options, expanderOption)
} else {
klog.Errorf("GRPC server returned invalid nodeGroup ID: %s", option.NodeGroupId)
continue
Expand All @@ -142,6 +153,25 @@ func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Op
return options
}

func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod) *protos.Option {
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods}
// Any similar options that were not included in the original grpcExpander request, but were added
// as part of the response, will be ignored
func getRetainedSimilarNodegroups(grpcOption *protos.Option, expanderOption expander.Option) []cloudprovider.NodeGroup {
var retainedSimilarNodeGroups []cloudprovider.NodeGroup
for _, sng := range expanderOption.SimilarNodeGroups {
retained := false
for _, id := range grpcOption.SimilarNodeGroupIds {
if sng.Id() == id {
retained = true
continue
}
}
if retained {
retainedSimilarNodeGroups = append(retainedSimilarNodeGroups, sng)
}
}
return retainedSimilarNodeGroups
}

func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod, similarNodeGroupIds []string) *protos.Option {
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods, SimilarNodeGroupIds: similarNodeGroupIds}
}
74 changes: 63 additions & 11 deletions cluster-autoscaler/expander/grpcplugin/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
"k8s.io/autoscaler/cluster-autoscaler/expander/mocks"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -58,6 +59,11 @@ var (
Debug: "m4.4xlarge",
NodeGroup: test.NewTestNodeGroup("my-asg.m4.4xlarge", 10, 1, 1, true, false, "m4.4xlarge", nil, nil),
}
eoT2MicroWithSimilar = expander.Option{
Debug: "t2.micro",
NodeGroup: test.NewTestNodeGroup("my-asg.t2.micro", 10, 1, 1, true, false, "t2.micro", nil, nil),
SimilarNodeGroups: []cloudprovider.NodeGroup{test.NewTestNodeGroup("my-similar-asg.t2.micro", 10, 1, 1, true, false, "t2.micro", nil, nil)},
}
options = []expander.Option{eoT2Micro, eoT2Large, eoT3Large, eoM44XLarge}

grpcEoT2Micro = protos.Option{
Expand All @@ -84,6 +90,20 @@ var (
Debug: eoM44XLarge.Debug,
Pod: eoM44XLarge.Pods,
}
grpcEoT2MicroWithSimilar = protos.Option{
NodeGroupId: eoT2Micro.NodeGroup.Id(),
NodeCount: int32(eoT2Micro.NodeCount),
Debug: eoT2Micro.Debug,
Pod: eoT2Micro.Pods,
SimilarNodeGroupIds: []string{eoT2MicroWithSimilar.SimilarNodeGroups[0].Id()},
}
grpcEoT2MicroWithSimilarWithExtraOptions = protos.Option{
NodeGroupId: eoT2Micro.NodeGroup.Id(),
NodeCount: int32(eoT2Micro.NodeCount),
Debug: eoT2Micro.Debug,
Pod: eoT2Micro.Pods,
SimilarNodeGroupIds: []string{eoT2MicroWithSimilar.SimilarNodeGroups[0].Id(), "extra-ng-id"},
}
)

func TestPopulateOptionsForGrpc(t *testing.T) {
Expand Down Expand Up @@ -116,6 +136,12 @@ func TestPopulateOptionsForGrpc(t *testing.T) {
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
},
},
{
desc: "similar nodegroups are included",
opts: []expander.Option{eoT2MicroWithSimilar},
expectedOpts: []*protos.Option{&grpcEoT2MicroWithSimilar},
expectedMap: map[string]expander.Option{eoT2MicroWithSimilar.NodeGroup.Id(): eoT2MicroWithSimilar},
},
}
for _, tc := range testCases {
grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(tc.opts)
Expand Down Expand Up @@ -146,18 +172,44 @@ func TestPopulateNodeInfoForGRPC(t *testing.T) {
}

func TestValidTransformAndSanitizeOptionsFromGRPC(t *testing.T) {
responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge}
nodeGroupIDOptionMap := map[string]expander.Option{
eoT2Micro.NodeGroup.Id(): eoT2Micro,
eoT2Large.NodeGroup.Id(): eoT2Large,
eoT3Large.NodeGroup.Id(): eoT3Large,
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
testCases := []struct {
desc string
responseOptions []*protos.Option
expectedOptions []expander.Option
nodegroupIDOptionaMap map[string]expander.Option
}{
{
desc: "valid transform and sanitize options",
responseOptions: []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge},
nodegroupIDOptionaMap: map[string]expander.Option{
eoT2Micro.NodeGroup.Id(): eoT2Micro,
eoT2Large.NodeGroup.Id(): eoT2Large,
eoT3Large.NodeGroup.Id(): eoT3Large,
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
},
expectedOptions: []expander.Option{eoT2Micro, eoT3Large, eoM44XLarge},
},
{
desc: "similar ngs are retained in proto options are retained",
responseOptions: []*protos.Option{&grpcEoT2MicroWithSimilar},
nodegroupIDOptionaMap: map[string]expander.Option{
eoT2MicroWithSimilar.NodeGroup.Id(): eoT2MicroWithSimilar,
},
expectedOptions: []expander.Option{eoT2MicroWithSimilar},
},
{
desc: "extra similar ngs added to expander response are ignored",
responseOptions: []*protos.Option{&grpcEoT2MicroWithSimilarWithExtraOptions},
nodegroupIDOptionaMap: map[string]expander.Option{
eoT2MicroWithSimilar.NodeGroup.Id(): eoT2MicroWithSimilar,
},
expectedOptions: []expander.Option{eoT2MicroWithSimilar},
},
}
for _, tc := range testCases {
ret := transformAndSanitizeOptionsFromGRPC(tc.responseOptions, tc.nodegroupIDOptionaMap)
assert.Equal(t, tc.expectedOptions, ret)
}

expectedOptions := []expander.Option{eoT2Micro, eoT3Large, eoM44XLarge}

ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap)
assert.Equal(t, expectedOptions, ret)
}

func TestAnInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) {
Expand Down
Loading

0 comments on commit d565386

Please sign in to comment.