Skip to content

Commit

Permalink
refactor: address PR review comments
Browse files Browse the repository at this point in the history
Signed-off-by: vadasambar <[email protected]>
  • Loading branch information
vadasambar committed Jan 11, 2024
1 parent 0b77571 commit 6aa4922
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 254 deletions.
21 changes: 2 additions & 19 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,30 +183,13 @@ func (csr *ClusterStateRegistry) Stop() {
close(csr.interrupt)
}

// RegisterOrUpdateScaleUp registers scale-up for give node group or changes requested node increase
// count.
// If delta is positive then number of new nodes requested is increased; Time and expectedAddTime
// are reset.
// If delta is negative the number of new nodes requested is decreased; Time and expectedAddTime are
// left intact.
func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
// RegisterScaleUp registers scale-up for give node group
func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) {
csr.Lock()
defer csr.Unlock()
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
}

// RegisterScaleUp registers scale-up for give node group or changes requested node increase
// count.
func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) error {
if delta < 0 {
return fmt.Errorf("%s; got '%v'", errNegativeNodeIncrease, delta)
}
csr.Lock()
defer csr.Unlock()
csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime)
return nil
}

// MaxNodeProvisionTime returns MaxNodeProvisionTime value that should be used for the given NodeGroup.
// TODO(BigDarkClown): remove this method entirely, it is a redundant wrapper
func (csr *ClusterStateRegistry) MaxNodeProvisionTime(nodeGroup cloudprovider.NodeGroup) (time.Duration, error) {
Expand Down
52 changes: 9 additions & 43 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestOKWithScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestHasNodeGroupStartedScaleUp(t *testing.T) {
assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1"))

provider.AddNodeGroup("ng1", 0, 5, tc.initialSize+tc.delta)
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second))
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsNodeGroupScalingUp("ng1"))
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestExpiredScaleUp(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute}))
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -1028,65 +1028,31 @@ func TestUpdateScaleUp(t *testing.T) {
nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}),
)

// Test cases for `RegisterOrUpdateScaleUp`
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now)
// Test cases for `RegisterScaleUp`
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))

// expect no change of times on negative delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -20, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 80)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))

// update times on positive delta
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 30, later)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 110)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second))

// if we get below 0 scalup is deleted
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])

// If new scalup is registered with negative delta nothing should happen
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now)
clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Nil(t, clusterstate.scaleUpRequests["ng1"])

// Test cases for RegisterScaleUp
err := clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))
assert.NoError(t, err)

// expect error on negative delta
err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second))
assert.ErrorContains(t, err, errNegativeNodeIncrease)

// update times on positive delta
err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 130)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second))
assert.NoError(t, err)

// expect error if delta is 0 or below
err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 130)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later)
assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second))
assert.ErrorContains(t, err, errNegativeNodeIncrease)

// expect error if new scalup is registered with negative delta
err = clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng2"), -200, now)
assert.ErrorContains(t, err, errNegativeNodeIncrease)
assert.Nil(t, clusterstate.scaleUpRequests["ng2"])

}

func TestScaleUpFailures(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ func TestStartDeletion(t *testing.T) {
}

scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.RegisterForNotifications(csr)
scaleStateNotifier.Register(csr)

// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
Expand Down Expand Up @@ -1429,7 +1429,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList()
scaleStateNotifier.RegisterForNotifications(csr)
scaleStateNotifier.Register(csr)
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, deleteInterval)
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
Expand Down
8 changes: 3 additions & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,10 @@ func (e *scaleUpExecutor) executeScaleUp(
e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), gpuResourceName, gpuType, now)
return aerr
}
if err := e.scaleStateNotifier.RegisterScaleUp(
info.Group,
increase,
time.Now()); err != nil {
return errors.ToAutoscalerError(errors.InternalError, err)
if increase < 0 {
return errors.NewAutoscalerError(errors.InternalError, fmt.Sprintf("increase in number of nodes cannot be negative, got: %v", increase))
}
e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now())
metrics.RegisterScaleUp(increase, gpuResourceName, gpuType)
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults))
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
processors := NewTestProcessors(&context)
processors.ScaleStateNotifier.RegisterForNotifications(clusterState)
processors.ScaleStateNotifier.Register(clusterState)
orchestrator := New()
orchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{})
expander := NewMockRepotingStrategy(t, config.ExpansionOptionToChoose)
Expand Down
27 changes: 14 additions & 13 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func NewStaticAutoscaler(

taintConfig := taints.NewTaintConfig(opts)
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)
processors.ScaleStateNotifier.RegisterForNotifications(clusterStateRegistry)
processors.ScaleStateNotifier.Register(clusterStateRegistry)

// TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext
// during the struct creation rather than here.
Expand Down Expand Up @@ -615,23 +615,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart)

scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop

if !a.ScaleDownDelayTypeLocal {
scaleDownInCooldown = scaleDownInCooldown ||
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
} else {
scaleDownInCooldown = scaleDownInCooldown || len(scaleDownCandidates) == 0
}

scaleDownInCooldown := a.isScaleDownInCooldown(currentTime, scaleDownCandidates)
klog.V(4).Infof("Scale down status: lastScaleUpTime=%s lastScaleDownDeleteTime=%v "+
"lastScaleDownFailTime=%s scaleDownForbidden=%v scaleDownInCooldown=%v",
a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime,
a.processorCallbacks.disableScaleDownForLoop, scaleDownInCooldown)
metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)

// We want to delete unneeded Node Groups only if here is no current delete
// in progress.
_, drained := scaleDownActuationStatus.DeletionsInProgress()
Expand Down Expand Up @@ -701,6 +690,18 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
return nil
}

func (a *StaticAutoscaler) isScaleDownInCooldown(currentTime time.Time, scaleDownCandidates []*apiv1.Node) bool {
scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop || len(scaleDownCandidates) == 0

if a.ScaleDownDelayTypeLocal {
return scaleDownInCooldown
}
return scaleDownInCooldown ||
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
}

// Sets the target size of node groups to the current number of nodes in them
// if the difference was constant for a prolonged time. Returns true if managed
// to fix something.
Expand Down
Loading

0 comments on commit 6aa4922

Please sign in to comment.