From 5407ba15baab219a7eac213aaa97fe6ff991efae Mon Sep 17 00:00:00 2001 From: Karol Wychowaniec Date: Mon, 11 Sep 2023 15:46:54 +0000 Subject: [PATCH] Refactor ScaleDownSet processor into a composite processor --- cluster-autoscaler/core/test/common.go | 7 ++- ...ocessor.go => scale_down_set_processor.go} | 59 ++++++++++++++++--- cluster-autoscaler/processors/processors.go | 11 +++- 3 files changed, 64 insertions(+), 13 deletions(-) rename cluster-autoscaler/processors/nodes/{post_filtering_processor.go => scale_down_set_processor.go} (54%) diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 1c67f4b51112..dfb970654ea6 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -178,7 +178,12 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, BinpackingLimiter: binpacking.NewDefaultBinpackingLimiter(), NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}), - ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), + ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor( + []nodes.ScaleDownSetProcessor{ + nodes.NewMaxNodesProcessor(), + nodes.NewAtomicResizeFilteringProcessor(), + }, + ), // TODO(bskiba): change scale up test so that this can be a NoOpProcessor ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, diff --git a/cluster-autoscaler/processors/nodes/post_filtering_processor.go b/cluster-autoscaler/processors/nodes/scale_down_set_processor.go similarity index 54% rename from cluster-autoscaler/processors/nodes/post_filtering_processor.go rename to cluster-autoscaler/processors/nodes/scale_down_set_processor.go index 06c59d1451ad..87c86002e874 100644 --- a/cluster-autoscaler/processors/nodes/post_filtering_processor.go +++ b/cluster-autoscaler/processors/nodes/scale_down_set_processor.go @@ -23,32 +23,64 @@ import ( klog "k8s.io/klog/v2" ) +// CompositeScaleDownSetProcessor is a ScaleDownSetProcessor composed of multiple sub-processors passed as an argument. +type CompositeScaleDownSetProcessor struct { + orderedProcessorList []ScaleDownSetProcessor +} + +// NewCompositeScaleDownSetProcessor creates new CompositeScaleDownSetProcessor. The order on a list defines order in witch +// sub-processors are invoked. +func NewCompositeScaleDownSetProcessor(orderedProcessorList []ScaleDownSetProcessor) *CompositeScaleDownSetProcessor { + return &CompositeScaleDownSetProcessor{ + orderedProcessorList: orderedProcessorList, + } +} + +// GetNodesToRemove selects nodes to remove. +func (p *CompositeScaleDownSetProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { + for _, p := range p.orderedProcessorList { + candidates = p.GetNodesToRemove(ctx, candidates, maxCount) + } + return candidates +} + +func (p *CompositeScaleDownSetProcessor) CleanUp() { + for _, p := range p.orderedProcessorList { + p.CleanUp() + } +} + // PostFilteringScaleDownNodeProcessor selects first maxCount nodes (if possible) to be removed -type PostFilteringScaleDownNodeProcessor struct { +type MaxNodesProcessor struct { } // GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates -func (p *PostFilteringScaleDownNodeProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { +func (p *MaxNodesProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { end := len(candidates) if len(candidates) > maxCount { end = maxCount } - return p.filterOutIncompleteAtomicNodeGroups(ctx, candidates[:end]) + return candidates[:end] } // CleanUp is called at CA termination -func (p *PostFilteringScaleDownNodeProcessor) CleanUp() { +func (p *MaxNodesProcessor) CleanUp() { +} + +// NewMaxNodesProcessor returns a new PostFilteringScaleDownNodeProcessor +func NewMaxNodesProcessor() *MaxNodesProcessor { + return &MaxNodesProcessor{} } -// NewPostFilteringScaleDownNodeProcessor returns a new PostFilteringScaleDownNodeProcessor -func NewPostFilteringScaleDownNodeProcessor() *PostFilteringScaleDownNodeProcessor { - return &PostFilteringScaleDownNodeProcessor{} +// AtomicResizeFilteringProcessor selects first maxCount nodes (if possible) to be removed +type AtomicResizeFilteringProcessor struct { } -func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroups(ctx *context.AutoscalingContext, nodes []simulator.NodeToBeRemoved) []simulator.NodeToBeRemoved { +// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates +func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { nodesByGroup := map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{} result := []simulator.NodeToBeRemoved{} - for _, node := range nodes { + for _, node := range candidates { nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node.Node) if err != nil { klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Node.Name, err) @@ -82,3 +114,12 @@ func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroup } return result } + +// CleanUp is called at CA termination +func (p *AtomicResizeFilteringProcessor) CleanUp() { +} + +// NewAtomicResizeFilteringProcessor returns a new PostFilteringScaleDownNodeProcessor +func NewAtomicResizeFilteringProcessor() *AtomicResizeFilteringProcessor { + return &AtomicResizeFilteringProcessor{} +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 638af953cf8e..8d7b949540e7 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -80,9 +80,14 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio, MaxFreeDifferenceRatio: config.DefaultMaxFreeDifferenceRatio, }), - ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), - ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), - ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(), + ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), + ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), + ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor( + []nodes.ScaleDownSetProcessor{ + nodes.NewMaxNodesProcessor(), + nodes.NewAtomicResizeFilteringProcessor(), + }, + ), ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),