From ea94a2b3436e2fc5503a3d64ebe7dff324e4dd85 Mon Sep 17 00:00:00 2001 From: Karol Wychowaniec Date: Mon, 11 Sep 2023 15:46:54 +0000 Subject: [PATCH] Refactor ScaleDownSet processor into a composite processor --- cluster-autoscaler/core/test/common.go | 5 +- ...ocessor.go => scale_down_set_processor.go} | 63 ++++++++++++++++--- cluster-autoscaler/processors/processors.go | 11 +++- 3 files changed, 65 insertions(+), 14 deletions(-) rename cluster-autoscaler/processors/nodes/{post_filtering_processor.go => scale_down_set_processor.go} (51%) diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 1c67f4b51112..004619249f99 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -178,7 +178,10 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, BinpackingLimiter: binpacking.NewDefaultBinpackingLimiter(), NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}), - ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), + ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor([]nodes.ScaleDownSetProcessor{ + nodes.NewAtomicResizeFilteringProcessor(), + nodes.NewMaxNodesProcessor(), + }), // TODO(bskiba): change scale up test so that this can be a NoOpProcessor ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, diff --git a/cluster-autoscaler/processors/nodes/post_filtering_processor.go b/cluster-autoscaler/processors/nodes/scale_down_set_processor.go similarity index 51% rename from cluster-autoscaler/processors/nodes/post_filtering_processor.go rename to cluster-autoscaler/processors/nodes/scale_down_set_processor.go index 06c59d1451ad..579837c48d5c 100644 --- a/cluster-autoscaler/processors/nodes/post_filtering_processor.go +++ b/cluster-autoscaler/processors/nodes/scale_down_set_processor.go @@ -23,32 +23,66 @@ import ( klog "k8s.io/klog/v2" ) -// PostFilteringScaleDownNodeProcessor selects first maxCount nodes (if possible) to be removed -type PostFilteringScaleDownNodeProcessor struct { +// CompositeScaleDownSetProcessor is a ScaleDownSetProcessor composed of multiple sub-processors passed as an argument. +type CompositeScaleDownSetProcessor struct { + orderedProcessorList []ScaleDownSetProcessor +} + +// NewCompositeScaleDownSetProcessor creates new CompositeScaleDownSetProcessor. The order on a list defines order in witch +// sub-processors are invoked. +func NewCompositeScaleDownSetProcessor(orderedProcessorList []ScaleDownSetProcessor) *CompositeScaleDownSetProcessor { + return &CompositeScaleDownSetProcessor{ + orderedProcessorList: orderedProcessorList, + } +} + +// GetNodesToRemove selects nodes to remove. +func (p *CompositeScaleDownSetProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { + for _, p := range p.orderedProcessorList { + candidates = p.GetNodesToRemove(ctx, candidates, maxCount) + } + return candidates +} + +// CleanUp is called at CA termination +func (p *CompositeScaleDownSetProcessor) CleanUp() { + for _, p := range p.orderedProcessorList { + p.CleanUp() + } +} + +// MaxNodesProcessor selects first maxCount nodes (if possible) to be removed +type MaxNodesProcessor struct { } // GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates -func (p *PostFilteringScaleDownNodeProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { +func (p *MaxNodesProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { end := len(candidates) if len(candidates) > maxCount { end = maxCount } - return p.filterOutIncompleteAtomicNodeGroups(ctx, candidates[:end]) + return candidates[:end] } // CleanUp is called at CA termination -func (p *PostFilteringScaleDownNodeProcessor) CleanUp() { +func (p *MaxNodesProcessor) CleanUp() { } -// NewPostFilteringScaleDownNodeProcessor returns a new PostFilteringScaleDownNodeProcessor -func NewPostFilteringScaleDownNodeProcessor() *PostFilteringScaleDownNodeProcessor { - return &PostFilteringScaleDownNodeProcessor{} +// NewMaxNodesProcessor returns a new MaxNodesProcessor +func NewMaxNodesProcessor() *MaxNodesProcessor { + return &MaxNodesProcessor{} } -func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroups(ctx *context.AutoscalingContext, nodes []simulator.NodeToBeRemoved) []simulator.NodeToBeRemoved { +// AtomicResizeFilteringProcessor removes node groups which should be scaled down as one unit +// if only part of these nodes were scheduled for scale down. +type AtomicResizeFilteringProcessor struct { +} + +// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates +func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { nodesByGroup := map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{} result := []simulator.NodeToBeRemoved{} - for _, node := range nodes { + for _, node := range candidates { nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node.Node) if err != nil { klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Node.Name, err) @@ -82,3 +116,12 @@ func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroup } return result } + +// CleanUp is called at CA termination +func (p *AtomicResizeFilteringProcessor) CleanUp() { +} + +// NewAtomicResizeFilteringProcessor returns a new AtomicResizeFilteringProcessor +func NewAtomicResizeFilteringProcessor() *AtomicResizeFilteringProcessor { + return &AtomicResizeFilteringProcessor{} +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 638af953cf8e..8d7b949540e7 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -80,9 +80,14 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio, MaxFreeDifferenceRatio: config.DefaultMaxFreeDifferenceRatio, }), - ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), - ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), - ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), + ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), + ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), + ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor( + []nodes.ScaleDownSetProcessor{ + nodes.NewMaxNodesProcessor(), + nodes.NewAtomicResizeFilteringProcessor(), + }, + ), ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),