diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index c0a62cb5c4e0..8470eed6a023 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -183,30 +183,13 @@ func (csr *ClusterStateRegistry) Stop() { close(csr.interrupt) } -// RegisterOrUpdateScaleUp registers scale-up for give node group or changes requested node increase -// count. -// If delta is positive then number of new nodes requested is increased; Time and expectedAddTime -// are reset. -// If delta is negative the number of new nodes requested is decreased; Time and expectedAddTime are -// left intact. -func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) { +// RegisterScaleUp registers scale-up for give node group +func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) { csr.Lock() defer csr.Unlock() csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime) } -// RegisterScaleUp registers scale-up for give node group or changes requested node increase -// count. -func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) error { - if delta < 0 { - return fmt.Errorf("%s; got '%v'", errNegativeNodeIncrease, delta) - } - csr.Lock() - defer csr.Unlock() - csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime) - return nil -} - // MaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for the given NodeGroup. // TODO(BigDarkClown): remove this method entirely, it is a redundant wrapper func (csr *ClusterStateRegistry) MaxNodeProvisionTime(nodeGroup cloudprovider.NodeGroup) (time.Duration, error) { diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index 7fa9ed512a35..1f8f141ea327 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -76,7 +76,7 @@ func TestOKWithScaleUp(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute})) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now()) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -165,7 +165,7 @@ func TestHasNodeGroupStartedScaleUp(t *testing.T) { assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1")) provider.AddNodeGroup("ng1", 0, 5, tc.initialSize+tc.delta) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second)) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second)) err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsNodeGroupScalingUp("ng1")) @@ -470,7 +470,7 @@ func TestExpiredScaleUp(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute})) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute)) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute)) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -1028,65 +1028,31 @@ func TestUpdateScaleUp(t *testing.T) { nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}), ) - // Test cases for `RegisterOrUpdateScaleUp` - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now) + // Test cases for `RegisterScaleUp` + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second)) // expect no change of times on negative delta - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -20, later) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 80) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second)) // update times on positive delta - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 30, later) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 110) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second)) // if we get below 0 scalup is deleted - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now) assert.Nil(t, clusterstate.scaleUpRequests["ng1"]) // If new scalup is registered with negative delta nothing should happen - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now) assert.Nil(t, clusterstate.scaleUpRequests["ng1"]) - - // Test cases for RegisterScaleUp - err := clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second)) - assert.NoError(t, err) - - // expect error on negative delta - err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second)) - assert.ErrorContains(t, err, errNegativeNodeIncrease) - - // update times on positive delta - err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 130) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second)) - assert.NoError(t, err) - - // expect error if delta is 0 or below - err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 130) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later) - assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second)) - assert.ErrorContains(t, err, errNegativeNodeIncrease) - - // expect error if new scalup is registered with negative delta - err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng2"), -200, now) - assert.ErrorContains(t, err, errNegativeNodeIncrease) - assert.Nil(t, clusterstate.scaleUpRequests["ng2"]) - } func TestScaleUpFailures(t *testing.T) { diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index b48a3eb9ed12..ab4b200866aa 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -1189,7 +1189,7 @@ func TestStartDeletion(t *testing.T) { } scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() - scaleStateNotifier.RegisterForNotifications(csr) + scaleStateNotifier.Register(csr) // Create Actuator, run StartDeletion, and verify the error. ndt := deletiontracker.NewNodeDeletionTracker(0) @@ -1429,7 +1429,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) { } csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute})) scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() - scaleStateNotifier.RegisterForNotifications(csr) + scaleStateNotifier.Register(csr) ndt := deletiontracker.NewNodeDeletionTracker(0) ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, deleteInterval) evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom} diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index eadf3ae93692..2475370e4917 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -154,12 +154,10 @@ func (e *scaleUpExecutor) executeScaleUp( e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), gpuResourceName, gpuType, now) return aerr } - if err := e.scaleStateNotifier.RegisterScaleUp( - info.Group, - increase, - time.Now()); err != nil { - return errors.ToAutoscalerError(errors.InternalError, err) + if increase < 0 { + return errors.NewAutoscalerError(errors.InternalError, fmt.Sprintf("increase in number of nodes cannot be negative, got: %v", increase)) } + e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now()) metrics.RegisterScaleUp(increase, gpuResourceName, gpuType) e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", "Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 9f11e2acded3..886c4e8dd01b 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -973,7 +973,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) processors := NewTestProcessors(&context) - processors.ScaleStateNotifier.RegisterForNotifications(clusterState) + processors.ScaleStateNotifier.Register(clusterState) orchestrator := New() orchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) expander := NewMockRepotingStrategy(t, config.ExpansionOptionToChoose) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 94800a56fcee..ec195e607201 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -168,7 +168,7 @@ func NewStaticAutoscaler( taintConfig := taints.NewTaintConfig(opts) processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry) - processors.ScaleStateNotifier.RegisterForNotifications(clusterStateRegistry) + processors.ScaleStateNotifier.Register(clusterStateRegistry) // TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext // during the struct creation rather than here. @@ -615,23 +615,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart) - scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop - - if !a.ScaleDownDelayTypeLocal { - scaleDownInCooldown = scaleDownInCooldown || - a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) || - a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || - a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) - } else { - scaleDownInCooldown = scaleDownInCooldown || len(scaleDownCandidates) == 0 - } - + scaleDownInCooldown := a.isScaleDownInCooldown(currentTime, scaleDownCandidates) klog.V(4).Infof("Scale down status: lastScaleUpTime=%s lastScaleDownDeleteTime=%v "+ "lastScaleDownFailTime=%s scaleDownForbidden=%v scaleDownInCooldown=%v", a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop, scaleDownInCooldown) metrics.UpdateScaleDownInCooldown(scaleDownInCooldown) - // We want to delete unneeded Node Groups only if here is no current delete // in progress. _, drained := scaleDownActuationStatus.DeletionsInProgress() @@ -701,6 +690,18 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr return nil } +func (a *StaticAutoscaler) isScaleDownInCooldown(currentTime time.Time, scaleDownCandidates []*apiv1.Node) bool { + scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop || len(scaleDownCandidates) == 0 + + if a.ScaleDownDelayTypeLocal { + return scaleDownInCooldown + } + return scaleDownInCooldown || + a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) || + a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || + a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) +} + // Sets the target size of node groups to the current number of nodes in them // if the difference was constant for a prolonged time. Returns true if managed // to fix something. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index af7b48fb54df..c3f938cffbfd 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -43,7 +43,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" @@ -421,14 +420,14 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { } processors := NewTestProcessors(&context) sddProcessor := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() - processors.ScaleStateNotifier.RegisterForNotifications(sddProcessor) + processors.ScaleStateNotifier.Register(sddProcessor) scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{} - processors.ScaleDownNodeProcessor = scaledowncandidates.NewCombinedScaleDownCandidatesProcessor([]nodes.ScaleDownNodeProcessor{ - scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers), - sddProcessor, - }) + cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor() + cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)) + cp.Register(sddProcessor) + processors.ScaleDownNodeProcessor = cp clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - processors.ScaleStateNotifier.RegisterForNotifications(clusterState) + processors.ScaleStateNotifier.Register(clusterState) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) suOrchestrator := orchestrator.New() @@ -455,140 +454,111 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { p2.Annotations[drain.PodSafeToEvictKey] = "true" p2.Spec.NodeName = "n2" - // Case 1: - // ng1 scaled up recently - // both ng1 and ng2 have under-utilized nodes - // expectation: under-utilized node in ng2 should be scaled down - t.Run("ng1 scaled up recently - both ng1 and ng2 have under-utilized nodes", func(t *testing.T) { - - // make CA think ng1 scaled up recently - processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Now().Add(-3*time.Minute)) - - provider.AddNode("ng1", n1) - provider.AddNode("ng2", n2) - ng1.SetTargetSize(1) - ng2.SetTargetSize(1) - - // Mark unneeded nodes. - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - - err = autoscaler.RunOnce(time.Now()) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - - // Scale down ng2 - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() - onScaleDownMock.On("ScaleDown", "ng2", "n2").Return(nil).Once() - - err = autoscaler.RunOnce(time.Now().Add(config.DefaultScaleDownUnneededTime)) - waitForDeleteToFinish(t, deleteFinished) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - - // reset scale up in ng1 so that it doesn't block scale down in the next test - // scale down is always recorded relative to time.Now(), no matter - // what currentTime time is passed to RunOnce() - processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Time{}) + testCases := []struct { + description string + beforeTest func(processors *ca_processors.AutoscalingProcessors) + expectedScaleDownNG string + expectedScaleDownNode string + afterTest func(processors *ca_processors.AutoscalingProcessors) + }{ + // Case 1: + // ng1 scaled up recently + // both ng1 and ng2 have under-utilized nodes + // expectation: under-utilized node in ng2 should be scaled down + { + description: "ng1 scaled up recently - both ng1 and ng2 have under-utilized nodes", + beforeTest: func(processors *ca_processors.AutoscalingProcessors) { + // make CA think ng1 scaled up recently + processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Now().Add(-3*time.Minute)) + }, + expectedScaleDownNG: "ng2", + expectedScaleDownNode: "n2", + afterTest: func(processors *ca_processors.AutoscalingProcessors) { + // reset scale up in ng1 so that it doesn't block scale down in the next test + // scale down is always recorded relative to time.Now(), no matter + // what currentTime time is passed to RunOnce() + processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Time{}) + }, + }, - }) + // Case 2: + // ng2 scaled down recently + // both ng1 and ng2 have under-utilized nodes + // expectation: under-utilized node in ng1 should be scaled down + { + description: "ng2 scaled down recently - both ng1 and ng2 have under-utilized nodes", + beforeTest: func(processors *ca_processors.AutoscalingProcessors) { + // make CA think ng2 scaled down recently + processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Now().Add(-3*time.Minute), time.Now()) + }, + expectedScaleDownNG: "ng1", + expectedScaleDownNode: "n1", + afterTest: func(processors *ca_processors.AutoscalingProcessors) { + // reset scale down in ng1 and ng2 so that it doesn't block scale down in the next test + // scale down is always recorded relative to time.Now(), no matter + // what currentTime time is passed to RunOnce() + processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Time{}, time.Time{}) + processors.ScaleStateNotifier.RegisterScaleDown(ng1, "n1", time.Time{}, time.Time{}) + }, + }, - // Case 2: - // ng2 scaled down recently - // both ng1 and ng2 have under-utilized nodes - // expectation: under-utilized node in ng1 should be scaled down - t.Run("ng2 scaled down recently - both ng1 and ng2 have under-utilized nodes", func(t *testing.T) { - - // make CA think ng2 scaled down recently - processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Now().Add(-3*time.Minute), time.Now()) - - provider.AddNode("ng1", n1) - provider.AddNode("ng2", n2) - ng1.SetTargetSize(1) - ng2.SetTargetSize(1) - - // Mark unneeded nodes. - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - - err = autoscaler.RunOnce(time.Now()) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - - // Scale down ng1 - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() - onScaleDownMock.On("ScaleDown", "ng1", "n1").Return(nil).Once() - - err = autoscaler.RunOnce(time.Now().Add(config.DefaultScaleDownUnneededTime)) - waitForDeleteToFinish(t, deleteFinished) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - - // reset scale down in ng1 and ng2 so that it doesn't block scale down in the next test - // scale down is always recorded relative to time.Now(), no matter - // what currentTime time is passed to RunOnce() - processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Time{}, time.Time{}) - processors.ScaleStateNotifier.RegisterScaleDown(ng1, "n1", time.Time{}, time.Time{}) - }) + // Case 3: + // ng1 had a scale down failure + // both ng1 and ng2 have under-utilized nodes + // expectation: under-utilized node in ng2 should be scaled down + { + description: "ng1 had scale-down failure - both ng1 and ng2 have under-utilized nodes", + beforeTest: func(processors *ca_processors.AutoscalingProcessors) { + // Make CA think scale down failed in ng1 + processors.ScaleStateNotifier.RegisterFailedScaleDown(ng1, "scale down failed", time.Now().Add(-3*time.Minute)) + }, + expectedScaleDownNG: "ng2", + expectedScaleDownNode: "n2", + afterTest: func(processors *ca_processors.AutoscalingProcessors) { - // Case 3: - // ng1 had a scale down failure - // both ng1 and ng2 have under-utilized nodes - // expectation: under-utilized node in ng2 should be scaled down - t.Run("ng1 had scale-down failure - both ng1 and ng2 have under-utilized nodes", func(t *testing.T) { - - // Make CA think scale down failed in ng1 - processors.ScaleStateNotifier.RegisterFailedScaleDown(ng1, "scale down failed", time.Now().Add(-3*time.Minute)) - - provider.AddNode("ng1", n1) - provider.AddNode("ng2", n2) - ng1.SetTargetSize(1) - ng2.SetTargetSize(1) - - // Mark unneeded nodes. - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - - err = autoscaler.RunOnce(time.Now()) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - - // Scale down ng2 - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() - onScaleDownMock.On("ScaleDown", "ng2", "n2").Return(nil).Once() - - err = autoscaler.RunOnce(time.Now().Add(config.DefaultScaleDownUnneededTime)) - waitForDeleteToFinish(t, deleteFinished) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - }) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + tc.beforeTest(processors) + + provider.AddNode("ng1", n1) + provider.AddNode("ng2", n2) + ng1.SetTargetSize(1) + ng2.SetTargetSize(1) + + // Mark unneeded nodes. + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + + err = autoscaler.RunOnce(time.Now()) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + + // Scale down nodegroup + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() + onScaleDownMock.On("ScaleDown", tc.expectedScaleDownNG, tc.expectedScaleDownNode).Return(nil).Once() + + err = autoscaler.RunOnce(time.Now().Add(config.DefaultScaleDownUnneededTime)) + waitForDeleteToFinish(t, deleteFinished) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + + tc.afterTest(processors) + }) + } } func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 36d9a371abb1..ed384ede665f 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -48,7 +48,6 @@ import ( ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates" @@ -493,17 +492,16 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting) } - processors := []nodes.ScaleDownNodeProcessor{ - scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers), - } + cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor() + cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)) if autoscalingOptions.ScaleDownDelayTypeLocal { - sddProcessor := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() - processors = append(processors, sddProcessor) - opts.Processors.ScaleStateNotifier.RegisterForNotifications(sddProcessor) + sdp := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() + cp.Register(sdp) + opts.Processors.ScaleStateNotifier.Register(sdp) } - opts.Processors.ScaleDownNodeProcessor = scaledowncandidates.NewCombinedScaleDownCandidatesProcessor(processors) + opts.Processors.ScaleDownNodeProcessor = cp var nodeInfoComparator nodegroupset.NodeInfoComparator if len(autoscalingOptions.BalancingLabels) > 0 { diff --git a/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go b/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go index 61b2270bf40c..d6a5172cf2a6 100644 --- a/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go +++ b/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go @@ -21,7 +21,6 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/klog/v2" ) // NodeGroupChangeObserver is an observer of: @@ -31,7 +30,7 @@ import ( // * scale-down failure(s) for a nodegroup type NodeGroupChangeObserver interface { // RegisterScaleUp records scale up for a nodegroup. - RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) error + RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) // RegisterScaleDowns records scale down for a nodegroup. RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, nodeName string, currentTime time.Time, expectedDeleteTime time.Time) // RegisterFailedScaleUp records failed scale-up for a nodegroup. @@ -49,23 +48,18 @@ type NodeGroupChangeObserversList struct { } // Register adds new observer to the list. -func (l *NodeGroupChangeObserversList) RegisterForNotifications(o NodeGroupChangeObserver) { +func (l *NodeGroupChangeObserversList) Register(o NodeGroupChangeObserver) { l.observers = append(l.observers, o) } // RegisterScaleUp calls RegisterScaleUp for each observer. func (l *NodeGroupChangeObserversList) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, - delta int, currentTime time.Time) error { + delta int, currentTime time.Time) { l.mutex.Lock() defer l.mutex.Unlock() for _, observer := range l.observers { - err := observer.RegisterScaleUp(nodeGroup, delta, currentTime) - if err != nil { - klog.Error(err) - return err - } + observer.RegisterScaleUp(nodeGroup, delta, currentTime) } - return nil } // RegisterScaleDown calls RegisterScaleDown for each observer. diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 7fa25e5ca0f3..43957b14889b 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -68,7 +68,7 @@ type AutoscalingProcessors struct { ActionableClusterProcessor actionablecluster.ActionableClusterProcessor // ScaleDownCandidatesNotifier is used to Update and Register new scale down candidates observer. ScaleDownCandidatesNotifier *scaledowncandidates.ObserversList - // ScaleStateNotifier is used to register + // ScaleStateNotifier is used to notify // * scale-ups per nodegroup // * scale-downs per nodegroup // * scale-up failures per nodegroup diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go index 795ed7bf3862..a3c554a219fd 100644 --- a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go @@ -63,7 +63,7 @@ func (p *ScaleDownCandidatesDelayProcessor) GetScaleDownCandidates(ctx *context. recent := func(m map[string]time.Time, d time.Duration, msg string) bool { if !m[nodeGroup.Id()].IsZero() && m[nodeGroup.Id()].Add(d).After(currentTime) { klog.V(4).Infof("Skipping scale down on node group %s because it %s recently at %v", - msg, nodeGroup.Id(), m[nodeGroup.Id()]) + nodeGroup.Id(), msg, m[nodeGroup.Id()]) return true } @@ -93,10 +93,8 @@ func (p *ScaleDownCandidatesDelayProcessor) CleanUp() { // RegisterScaleUp records when the last scale up happened for a nodegroup. func (p *ScaleDownCandidatesDelayProcessor) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, - _ int, currentTime time.Time) error { + _ int, currentTime time.Time) { p.scaleUps[nodeGroup.Id()] = currentTime - - return nil } // RegisterScaleDown records when the last scale down happened for a nodegroup. diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go index 529f7d2c5a57..cdc1eac303f1 100644 --- a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -75,10 +76,13 @@ func TestGetScaleDownCandidates(t *testing.T) { candidates: []*v1.Node{n1, n2, n3}, expected: []*v1.Node{n1, n3}, setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor { + // fake nodegroups for calling `RegisterScaleDown` + ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil) + ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil) // in cool down - p.scaleDowns["ng-2"] = time.Now().Add(-time.Minute * 5) + p.RegisterScaleDown(ng2, "n2", time.Now().Add(-time.Minute*5), time.Time{}) // not in cool down anymore - p.scaleDowns["ng-3"] = time.Now().Add(-time.Minute * 11) + p.RegisterScaleDown(ng3, "n3", time.Now().Add(-time.Minute*11), time.Time{}) return p }, @@ -89,10 +93,14 @@ func TestGetScaleDownCandidates(t *testing.T) { candidates: []*v1.Node{n1, n2, n3}, expected: []*v1.Node{n1, n3}, setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor { + // fake nodegroups for calling `RegisterScaleUp` + ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil) + ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil) + // in cool down - p.scaleUps["ng-2"] = time.Now().Add(-time.Minute * 5) + p.RegisterScaleUp(ng2, 0, time.Now().Add(-time.Minute*5)) // not in cool down anymore - p.scaleUps["ng-3"] = time.Now().Add(-time.Minute * 11) + p.RegisterScaleUp(ng3, 0, time.Now().Add(-time.Minute*11)) return p }, }, @@ -102,10 +110,14 @@ func TestGetScaleDownCandidates(t *testing.T) { candidates: []*v1.Node{n1, n2, n3}, expected: []*v1.Node{n1, n3}, setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor { + // fake nodegroups for calling `RegisterScaleUp` + ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil) + ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil) + // in cool down - p.scaleDownFailures["ng-2"] = time.Now().Add(-time.Minute * 5) + p.RegisterFailedScaleDown(ng2, "", time.Now().Add(-time.Minute*5)) // not in cool down anymore - p.scaleDownFailures["ng-3"] = time.Now().Add(-time.Minute * 11) + p.RegisterFailedScaleDown(ng3, "", time.Now().Add(-time.Minute*11)) return p }, }, diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go index 90e9e2d8ff1a..854b6831624e 100644 --- a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go @@ -29,11 +29,14 @@ type combinedScaleDownCandidatesProcessor struct { // NewCombinedScaleDownCandidatesProcessor returns a default implementation of the scale down candidates // processor, which wraps and sequentially runs other sub-processors. -func NewCombinedScaleDownCandidatesProcessor(processors []nodes.ScaleDownNodeProcessor) *combinedScaleDownCandidatesProcessor { - return &combinedScaleDownCandidatesProcessor{ - processors: processors, - } +func NewCombinedScaleDownCandidatesProcessor() *combinedScaleDownCandidatesProcessor { + return &combinedScaleDownCandidatesProcessor{} + +} +// Register registers a new ScaleDownNodeProcessor +func (p *combinedScaleDownCandidatesProcessor) Register(np nodes.ScaleDownNodeProcessor) { + p.processors = append(p.processors, np) } // GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods