Skip to content

Commit

Permalink
feat: support --scale-down-delay-after-* per nodegroup
Browse files Browse the repository at this point in the history
Signed-off-by: vadasambar <[email protected]>

feat: update scale down status after every scale up
- move scaledown delay status to cluster state/registry
- enable scale down if  `ScaleDownDelayTypeLocal` is enabled
- add new funcs on cluster state to get and update scale down delay status
- use timestamp instead of booleans to track scale down delay status
Signed-off-by: vadasambar <[email protected]>

refactor: use existing fields on clusterstate
- uses `scaleUpRequests`, `scaleDownRequests` and `scaleUpFailures` instead of `ScaleUpDelayStatus`
- changed the above existing fields a little to make them more convenient for use
- moved initializing scale down delay processor to static autoscaler (because clusterstate is not available in main.go)
Signed-off-by: vadasambar <[email protected]>

refactor: remove note saying only `scale-down-after-add` is supported
- because we are supporting all the flags
Signed-off-by: vadasambar <[email protected]>

fix: evaluate `scaleDownInCooldown` the old way only if `ScaleDownDelayTypeLocal` is set to `false`
Signed-off-by: vadasambar <[email protected]>

refactor: remove line saying `--scale-down-delay-type-local` is only supported for `--scale-down-delay-after-add`
- because it is not true anymore
- we are supporting all `--scale-down-delay-after-*` flags per nodegroup
Signed-off-by: vadasambar <[email protected]>

test: fix clusterstate tests failing
Signed-off-by: vadasambar <[email protected]>

refactor: move back initializing processors logic to from static autoscaler to main
- we don't want to initialize processors in static autoscaler because anyone implementing an alternative to static_autoscaler has to initialize the processors
- and initializing specific processors is making static autoscaler aware of an implementation detail which might not be the best practice
Signed-off-by: vadasambar <[email protected]>

refactor: revert changes related to `clusterstate`
- since I am going with observer pattern
Signed-off-by: vadasambar <[email protected]>

feat: add observer interface for state of scaling
- to implement observer pattern for tracking state of scale up/downs (as opposed to using clusterstate to do the same)
- refactor `ScaleDownCandidatesDelayProcessor` to use fields from the new observer
Signed-off-by: vadasambar <[email protected]>

refactor: remove params passed to `clearScaleUpFailures`
- not needed anymore
Signed-off-by: vadasambar <[email protected]>

refactor: revert clusterstate tests
- approach has changed
- I am not making any changes in clusterstate now
Signed-off-by: vadasambar <[email protected]>

refactor: add accidentally deleted lines for clusterstate test
Signed-off-by: vadasambar <[email protected]>

feat: implement `Add` fn for scale state observer
- to easily add new observers
- re-word comments
- remove redundant params from `NewDefaultScaleDownCandidatesProcessor`
Signed-off-by: vadasambar <[email protected]>

fix: CI complaining because no comments on fn definitions
Signed-off-by: vadasambar <[email protected]>

feat: initialize parent `ScaleDownCandidatesProcessor`
- instead  of `ScaleDownCandidatesSortingProcessor` and `ScaleDownCandidatesDelayProcessor` separately
Signed-off-by: vadasambar <[email protected]>

refactor: add scale state notifier to list of default processors
- initialize processors for `NewDefaultScaleDownCandidatesProcessor` outside and pass them to the fn
- this allows more flexibility
Signed-off-by: vadasambar <[email protected]>

refactor: add observer interface
- create a separate observer directory
- implement `RegisterScaleUp` function in the clusterstate
- TODO: resolve syntax errors
Signed-off-by: vadasambar <[email protected]>

feat: use `scaleStateNotifier` in place of `clusterstate`
- delete leftover `scale_stateA_observer.go` (new one is already present in `observers` directory)
- register `clustertstate` with `scaleStateNotifier`
- use `Register` instead of `Add` function in `scaleStateNotifier`
- fix `go build`
- wip: fixing tests
Signed-off-by: vadasambar <[email protected]>

test: fix syntax errors
- add utils package `pointers` for converting `time` to pointer (without having to initialize a new variable)
Signed-off-by: vadasambar <[email protected]>

feat: wip track scale down failures along with scale up failures
- I was tracking scale up failures but not scale down failures
- fix copyright year 2017 -> 2023 for the new `pointers` package
Signed-off-by: vadasambar <[email protected]>

feat: register failed scale down with scale state notifier
- wip writing tests for `scale_down_candidates_delay_processor`
- fix CI lint errors
- remove test file for `scale_down_candidates_processor` (there is not much to test as of now)
Signed-off-by: vadasambar <[email protected]>

test: wip tests for `ScaleDownCandidatesDelayProcessor`
Signed-off-by: vadasambar <[email protected]>

test: add unit tests for `ScaleDownCandidatesDelayProcessor`
Signed-off-by: vadasambar <[email protected]>

refactor: don't track scale up failures in `ScaleDownCandidatesDelayProcessor`
- not needed
Signed-off-by: vadasambar <[email protected]>

test: better doc comments for `TestGetScaleDownCandidates`
Signed-off-by: vadasambar <[email protected]>

refactor: don't ignore error in `NGChangeObserver`
- return it instead and let the caller decide what to do with it
Signed-off-by: vadasambar <[email protected]>

refactor: change pointers to values in `NGChangeObserver` interface
- easier to work with
- remove `expectedAddTime` param from `RegisterScaleUp` (not needed for now)
- add tests for clusterstate's `RegisterScaleUp`
Signed-off-by: vadasambar <[email protected]>

refactor: conditions in `GetScaleDownCandidates`
- set scale down in cool down if the number of scale down candidates is 0
Signed-off-by: vadasambar <[email protected]>

test: use `ng1` instead of `ng2` in existing test
Signed-off-by: vadasambar <[email protected]>

feat: wip static autoscaler tests
Signed-off-by: vadasambar <[email protected]>

refactor: assign directly instead of using `sdProcessor` variable
- variable is not needed
Signed-off-by: vadasambar <[email protected]>

test: first working test for static autoscaler
Signed-off-by: vadasambar <[email protected]>

test: continue working on static autoscaler tests
Signed-off-by: vadasambar <[email protected]>

test: wip second static autoscaler test
Signed-off-by: vadasambar <[email protected]>

refactor: remove `Println` used for debugging
Signed-off-by: vadasambar <[email protected]>

test: add static_autoscaler tests for scale down delay per nodegroup flags
Signed-off-by: vadasambar <[email protected]>

chore: rebase off the latest `master`
- change scale state observer interface's `RegisterFailedScaleup` to reflect latest changes around clusterstate's `RegisterFailedScaleup` in `master`
Signed-off-by: vadasambar <[email protected]>

test: fix clusterstate test failing
Signed-off-by: vadasambar <[email protected]>

test: fix failing orchestrator test
Signed-off-by: vadasambar <[email protected]>

refactor: rename `defaultScaleDownCandidatesProcessor` -> `combinedScaleDownCandidatesProcessor`
- describes the processor better
Signed-off-by: vadasambar <[email protected]>

refactor: replace `NGChangeObserver` -> `NodeGroupChangeObserver`
- makes it easier to understand for someone not familiar with the codebase
Signed-off-by: vadasambar <[email protected]>

docs: reword code comment `after` -> `for which`
Signed-off-by: vadasambar <[email protected]>

refactor: don't return error from `RegisterScaleDown`
- not needed as of now (no implementer function returns a non-nil error for this function)
Signed-off-by: vadasambar <[email protected]>

refactor: address review comments around ng change observer interface
- change dir structure of nodegroup change observer package
- stop returning errors wherever it is not needed in the nodegroup change observer interface
- rename `NGChangeObserver` -> `NodeGroupChangeObserver` interface (makes it easier to understand)
Signed-off-by: vadasambar <[email protected]>

refactor: make nodegroupchange observer thread-safe
Signed-off-by: vadasambar <[email protected]>

docs: add TODO to consider using multiple mutexes in nodegroupchange observer
Signed-off-by: vadasambar <[email protected]>

refactor: use `time.Now()` directly instead of assigning a variable to it
Signed-off-by: vadasambar <[email protected]>

refactor: share code for checking if there was a recent scale-up/down/failure
Signed-off-by: vadasambar <[email protected]>

test: convert `ScaleDownCandidatesDelayProcessor` into table tests
Signed-off-by: vadasambar <[email protected]>

refactor: change scale state notifier's `Register()` -> `RegisterForNotifications()`
- makes it easier to understand what the function does
Signed-off-by: vadasambar <[email protected]>

test: replace scale state notifier `Register` -> `RegisterForNotifications` in test
- to fix syntax errors since it is already renamed in the actual code
Signed-off-by: vadasambar <[email protected]>

refactor: remove `clusterStateRegistry` from `delete_in_batch` tests
- not needed anymore since we have `scaleStateNotifier`
Signed-off-by: vadasambar <[email protected]>

refactor: address PR review comments
Signed-off-by: vadasambar <[email protected]>

fix: add empty `RegisterFailedScaleDown` for clusterstate
- fix syntax error in static autoscaler test
Signed-off-by: vadasambar <[email protected]>
  • Loading branch information
vadasambar committed Jan 11, 2024
1 parent cdb24d8 commit 5de49a1
Show file tree
Hide file tree
Showing 20 changed files with 798 additions and 90 deletions.
27 changes: 17 additions & 10 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,8 @@ func (csr *ClusterStateRegistry) Stop() {
close(csr.interrupt)
}

// RegisterOrUpdateScaleUp registers scale-up for give node group or changes requested node increase
// count.
// If delta is positive then number of new nodes requested is increased; Time and expectedAddTime
// are reset.
// If delta is negative the number of new nodes requested is decreased; Time and expectedAddTime are
// left intact.
func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
// RegisterScaleUp registers scale-up for give node group
func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
Expand Down Expand Up @@ -246,7 +241,14 @@ func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudpr
}

// RegisterScaleDown registers node scale down.
func (csr *ClusterStateRegistry) RegisterScaleDown(request *ScaleDownRequest) {
func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup,
nodeName string, currentTime time.Time, expectedDeleteTime time.Time) {
request := &ScaleDownRequest{
NodeGroup: nodeGroup,
NodeName: nodeName,
Time: currentTime,
ExpectedDeleteTime: expectedDeleteTime,
}
csr.Lock()
defer csr.Unlock()
csr.scaleDownRequests = append(csr.scaleDownRequests, request)
Expand Down Expand Up @@ -310,16 +312,21 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr
// RegisterFailedScaleUp should be called after getting error from cloudprovider
// when trying to scale-up node group. It will mark this group as not safe to autoscale
// for some time.
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerFailedScaleUpNoLock(nodeGroup, reason, cloudprovider.InstanceErrorInfo{
csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{
ErrorClass: cloudprovider.OtherErrorClass,
ErrorCode: string(reason),
ErrorMessage: errorMessage,
}, gpuResourceName, gpuType, currentTime)
}

// RegisterFailedScaleDown records failed scale-down for a nodegroup.
// We don't need to implement this function for cluster state registry
func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) {
}

func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) {
csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime})
metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType)
Expand Down
41 changes: 19 additions & 22 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"

"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestOKWithScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestEmptyOK(t *testing.T) {
assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1"))

provider.AddNodeGroup("ng1", 0, 10, 3)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
// clusterstate.scaleUpRequests["ng1"].Time = now.Add(-3 * time.Second)
// clusterstate.scaleUpRequests["ng1"].ExpectedAddTime = now.Add(1 * time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestHasNodeGroupStartedScaleUp(t *testing.T) {
assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1"))

provider.AddNodeGroup("ng1", 0, 5, tc.initialSize+tc.delta)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsNodeGroupScalingUp("ng1"))
Expand Down Expand Up @@ -450,7 +451,7 @@ func TestExpiredScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand All @@ -476,13 +477,7 @@ func TestRegisterScaleDown(t *testing.T) {
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
now := time.Now()

clusterstate.RegisterScaleDown(&ScaleDownRequest{
NodeGroup: provider.GetNodeGroup("ng1"),
NodeName: "ng1-1",
ExpectedDeleteTime: now.Add(time.Minute),
Time: now,
})
clusterstate.RegisterScaleDown(provider.GetNodeGroup("ng1"), "ng1-1", now.Add(time.Minute), now)
assert.Equal(t, 1, len(clusterstate.scaleDownRequests))
clusterstate.updateScaleRequests(now.Add(5 * time.Minute))
assert.Equal(t, 0, len(clusterstate.scaleDownRequests))
Expand Down Expand Up @@ -794,7 +789,7 @@ func TestScaleUpBackoff(t *testing.T) {
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 120 * time.Second}))

// After failed scale-up, node group should be still healthy, but should backoff from scale-ups
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -826,7 +821,7 @@ func TestScaleUpBackoff(t *testing.T) {
assert.Equal(t, NodeGroupScalingSafety{SafeToScale: true, Healthy: true}, clusterstate.NodeGroupScaleUpSafety(ng1, now))

// Another failed scale up should cause longer backoff
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
Expand Down Expand Up @@ -860,7 +855,7 @@ func TestScaleUpBackoff(t *testing.T) {
}, clusterstate.NodeGroupScaleUpSafety(ng1, now))

// The backoff should be cleared after a successful scale-up
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now)
ng1_4 := BuildTestNode("ng1-4", 1000, 1000)
SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute))
provider.AddNode("ng1", ng1_4)
Expand Down Expand Up @@ -935,6 +930,7 @@ func TestUpdateScaleUp(t *testing.T) {

provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 5)
provider.AddNodeGroup("ng2", 1, 10, 5)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(
Expand All @@ -948,29 +944,30 @@ func TestUpdateScaleUp(t *testing.T) {
nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}),
)

clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now)
// Test cases for `RegisterScaleUp`
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))

// expect no change of times on negative delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -20, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 80)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))

// update times on positive delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 30, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 110)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second))

// if we get below 0 scalup is deleted
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])

// If new scalup is registered with negative delta nothing should happen
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])
}

Expand All @@ -986,9 +983,9 @@ func TestScaleUpFailures(t *testing.T) {
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))

clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.Timeout, "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), metrics.Timeout, "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.APIError, "", "", "", now.Add(time.Minute))
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.Timeout), "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), string(metrics.Timeout), "", "", "", now)
clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.APIError), "", "", "", now.Add(time.Minute))

failures := clusterstate.GetScaleUpFailures()
assert.Equal(t, map[string][]ScaleUpFailure{
Expand Down
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ type AutoscalingOptions struct {
ScaleDownDelayAfterDelete time.Duration
// ScaleDownDelayAfterFailure sets the duration before the next scale down attempt if scale down results in an error
ScaleDownDelayAfterFailure time.Duration
// ScaleDownDelayTypeLocal sets if the --scale-down-delay-after-* flags should be applied locally per nodegroup
// or globally across all nodegroups
ScaleDownDelayTypeLocal bool
// ScaleDownNonEmptyCandidatesCount is the maximum number of non empty nodes
// considered at once as candidates for scale down.
ScaleDownNonEmptyCandidatesCount int
Expand Down
7 changes: 6 additions & 1 deletion cluster-autoscaler/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ const (
DefaultMaxNodeProvisionTimeKey = "maxnodeprovisiontime"
// DefaultIgnoreDaemonSetsUtilizationKey identifies IgnoreDaemonSetsUtilization autoscaling option
DefaultIgnoreDaemonSetsUtilizationKey = "ignoredaemonsetsutilization"
// DefaultScaleDownUnneededTime identifies ScaleDownUnneededTime autoscaling option

// DefaultScaleDownUnneededTime is the default time duration for which CA waits before deleting an unneeded node
DefaultScaleDownUnneededTime = 10 * time.Minute
// DefaultScaleDownUnreadyTime identifies ScaleDownUnreadyTime autoscaling option
DefaultScaleDownUnreadyTime = 20 * time.Minute
// DefaultScaleDownUtilizationThreshold identifies ScaleDownUtilizationThreshold autoscaling option
DefaultScaleDownUtilizationThreshold = 0.5
// DefaultScaleDownGpuUtilizationThreshold identifies ScaleDownGpuUtilizationThreshold autoscaling option
DefaultScaleDownGpuUtilizationThreshold = 0.5
// DefaultScaleDownDelayAfterFailure is the default value for ScaleDownDelayAfterFailure autoscaling option
DefaultScaleDownDelayAfterFailure = 3 * time.Minute
// DefaultScanInterval is the default scan interval for CA
DefaultScanInterval = 10 * time.Second
)
10 changes: 4 additions & 6 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
Expand All @@ -31,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
Expand All @@ -45,7 +45,6 @@ import (
// Actuator is responsible for draining and deleting nodes.
type Actuator struct {
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionScheduler *GroupDeletionScheduler
deleteOptions options.NodeDeleteOptions
Expand All @@ -66,8 +65,8 @@ type actuatorNodeGroupConfigGetter interface {
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
if len(ctx.DrainPriorityConfig) > 0 {
Expand All @@ -77,7 +76,6 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
}
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
Expand All @@ -102,7 +100,7 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) {
func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) {
a.nodeDeletionScheduler.ReportMetrics()
deletionStartTime := time.Now()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Now().Sub(deletionStartTime)) }()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }()

results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}
Expand Down
14 changes: 10 additions & 4 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -1186,13 +1187,16 @@ func TestStartDeletion(t *testing.T) {
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
}

scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.Register(csr)

// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, 0*time.Second)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig, fullDsEviction: false}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
Expand Down Expand Up @@ -1424,12 +1428,14 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.Register(csr)
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval)
ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, deleteInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
}
Expand Down
Loading

0 comments on commit 5de49a1

Please sign in to comment.