Skip to content

Commit

Permalink
Refactor ScaleDownSet processor into a composite processor
Browse files Browse the repository at this point in the history
  • Loading branch information
kawych committed Sep 11, 2023
1 parent 3852f35 commit 5407ba1
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 13 deletions.
7 changes: 6 additions & 1 deletion cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,12 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
BinpackingLimiter: binpacking.NewDefaultBinpackingLimiter(),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}),
ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(),
ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor(
[]nodes.ScaleDownSetProcessor{
nodes.NewMaxNodesProcessor(),
nodes.NewAtomicResizeFilteringProcessor(),
},
),
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{},
ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,64 @@ import (
klog "k8s.io/klog/v2"
)

// CompositeScaleDownSetProcessor is a ScaleDownSetProcessor composed of multiple sub-processors passed as an argument.
type CompositeScaleDownSetProcessor struct {
orderedProcessorList []ScaleDownSetProcessor
}

// NewCompositeScaleDownSetProcessor creates new CompositeScaleDownSetProcessor. The order on a list defines order in witch
// sub-processors are invoked.
func NewCompositeScaleDownSetProcessor(orderedProcessorList []ScaleDownSetProcessor) *CompositeScaleDownSetProcessor {
return &CompositeScaleDownSetProcessor{
orderedProcessorList: orderedProcessorList,
}
}

// GetNodesToRemove selects nodes to remove.
func (p *CompositeScaleDownSetProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
for _, p := range p.orderedProcessorList {
candidates = p.GetNodesToRemove(ctx, candidates, maxCount)
}
return candidates
}

func (p *CompositeScaleDownSetProcessor) CleanUp() {
for _, p := range p.orderedProcessorList {
p.CleanUp()
}
}

// PostFilteringScaleDownNodeProcessor selects first maxCount nodes (if possible) to be removed
type PostFilteringScaleDownNodeProcessor struct {
type MaxNodesProcessor struct {
}

// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates
func (p *PostFilteringScaleDownNodeProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
func (p *MaxNodesProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
end := len(candidates)
if len(candidates) > maxCount {
end = maxCount
}
return p.filterOutIncompleteAtomicNodeGroups(ctx, candidates[:end])
return candidates[:end]
}

// CleanUp is called at CA termination
func (p *PostFilteringScaleDownNodeProcessor) CleanUp() {
func (p *MaxNodesProcessor) CleanUp() {
}

// NewMaxNodesProcessor returns a new PostFilteringScaleDownNodeProcessor
func NewMaxNodesProcessor() *MaxNodesProcessor {
return &MaxNodesProcessor{}
}

// NewPostFilteringScaleDownNodeProcessor returns a new PostFilteringScaleDownNodeProcessor
func NewPostFilteringScaleDownNodeProcessor() *PostFilteringScaleDownNodeProcessor {
return &PostFilteringScaleDownNodeProcessor{}
// AtomicResizeFilteringProcessor selects first maxCount nodes (if possible) to be removed
type AtomicResizeFilteringProcessor struct {
}

func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroups(ctx *context.AutoscalingContext, nodes []simulator.NodeToBeRemoved) []simulator.NodeToBeRemoved {
// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates
func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
nodesByGroup := map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{}
result := []simulator.NodeToBeRemoved{}
for _, node := range nodes {
for _, node := range candidates {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node.Node)
if err != nil {
klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Node.Name, err)
Expand Down Expand Up @@ -82,3 +114,12 @@ func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroup
}
return result
}

// CleanUp is called at CA termination
func (p *AtomicResizeFilteringProcessor) CleanUp() {
}

// NewAtomicResizeFilteringProcessor returns a new PostFilteringScaleDownNodeProcessor
func NewAtomicResizeFilteringProcessor() *AtomicResizeFilteringProcessor {
return &AtomicResizeFilteringProcessor{}
}
11 changes: 8 additions & 3 deletions cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,14 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors
MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio,
MaxFreeDifferenceRatio: config.DefaultMaxFreeDifferenceRatio,
}),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(),
ScaleDownSetProcessor: nodes.NewPostFilteringScaleDownNodeProcessor(),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(),
ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(),
ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor(
[]nodes.ScaleDownSetProcessor{
nodes.NewMaxNodesProcessor(),
nodes.NewAtomicResizeFilteringProcessor(),
},
),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
Expand Down

0 comments on commit 5407ba1

Please sign in to comment.