From e4f43a14091f59cbfbd6ebe35148c6f00971662a Mon Sep 17 00:00:00 2001 From: "dom.bozzuto" Date: Fri, 14 Jun 2024 13:30:40 -0400 Subject: [PATCH] Prevent autoscaler from performing multiple deletes --- .../cloudprovider/azure/azure_manager_test.go | 1 + .../cloudprovider/azure/azure_scale_set.go | 57 ++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index 130d79bae08b..8a0274fdfad9 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -680,6 +680,7 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { curSize: 3, sizeRefreshPeriod: manager.azureCache.refreshInterval, instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + deletionsInProgress: make(map[string]struct{}), }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 3b7978c5868b..0ae58d53c9eb 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -63,6 +63,9 @@ type ScaleSet struct { instanceMutex sync.Mutex instanceCache []cloudprovider.Instance lastInstanceRefresh time.Time + + deletionMutex sync.Mutex + deletionsInProgress map[string]struct{} } // NewScaleSet creates a new NewScaleSet. @@ -78,6 +81,7 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) ( sizeRefreshPeriod: az.azureCache.refreshInterval, enableDynamicInstanceList: az.config.EnableDynamicInstanceList, instancesRefreshJitter: az.config.VmssVmsCacheJitter, + deletionsInProgress: make(map[string]struct{}), } if az.config.VmssVmsCacheTTL != 0 { @@ -183,6 +187,10 @@ func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredI klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s", requiredIds.InstanceIds, scaleSet.Name) httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(ctx, future, scaleSet.manager.config.ResourceGroup) + for _, instanceID := range *requiredIds.InstanceIds { + scaleSet.trackDeletionEnd(instanceID) + klog.V(3).Infof("call to delete instance %s finished", instanceID) + } isSuccess, err := isSuccessHTTPResponse(httpResponse, err) if isSuccess { klog.V(3).Infof("virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s success", requiredIds.InstanceIds, scaleSet.Name) @@ -395,6 +403,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name) continue } + instancesToDelete = append(instancesToDelete, instance) } @@ -414,18 +423,25 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered instanceIDs = append(instanceIDs, instanceID) } + var instanceIDsToDelete []string + for _, instanceID := range instanceIDs { + if scaleSet.isBeingDeleted(instanceID) { + klog.V(3).Infof("instance %s is already being deleted", instanceID) + continue + } + instanceIDsToDelete = append(instanceIDsToDelete, instanceID) + } + requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ - InstanceIds: &instanceIDs, + InstanceIds: &instanceIDsToDelete, } ctx, cancel := getContextWithTimeout(vmssContextTimeout) defer cancel() resourceGroup := scaleSet.manager.config.ResourceGroup - scaleSet.instanceMutex.Lock() klog.V(3).Infof("Calling virtualMachineScaleSetsClient.DeleteInstancesAsync(%v)", requiredIds.InstanceIds) future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeleteInstancesAsync(ctx, resourceGroup, commonAsg.Id(), *requiredIds, false) - scaleSet.instanceMutex.Unlock() if rerr != nil { klog.Errorf("virtualMachineScaleSetsClient.DeleteInstancesAsync for instances %v failed: %v", requiredIds.InstanceIds, rerr) return rerr.Error() @@ -440,6 +456,10 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered scaleSet.lastSizeRefresh = time.Now() scaleSet.sizeMutex.Unlock() } + for _, instanceID := range *requiredIds.InstanceIds { + scaleSet.trackDeletionStart(instanceID) + klog.V(3).Infof("call to delete instance %s started", instanceID) + } // Proactively set the status of the instances to be deleted in cache for _, instance := range instancesToDelete { @@ -732,3 +752,34 @@ func (scaleSet *ScaleSet) getOrchestrationMode() (compute.OrchestrationMode, err } return vmss.OrchestrationMode, nil } + +func (scaleSet *ScaleSet) trackDeletionStart(instanceID string) { + scaleSet.deletionMutex.Lock() + defer scaleSet.deletionMutex.Unlock() + if scaleSet.deletionsInProgress == nil { + scaleSet.deletionsInProgress = make(map[string]struct{}) + } + + scaleSet.deletionsInProgress[instanceID] = struct{}{} +} + +func (scaleSet *ScaleSet) trackDeletionEnd(instanceID string) { + scaleSet.deletionMutex.Lock() + defer scaleSet.deletionMutex.Unlock() + if scaleSet.deletionsInProgress == nil { + scaleSet.deletionsInProgress = make(map[string]struct{}) + } + + delete(scaleSet.deletionsInProgress, instanceID) +} + +func (scaleSet *ScaleSet) isBeingDeleted(instanceID string) bool { + scaleSet.deletionMutex.Lock() + defer scaleSet.deletionMutex.Unlock() + if scaleSet.deletionsInProgress == nil { + scaleSet.deletionsInProgress = make(map[string]struct{}) + } + + _, ok := scaleSet.deletionsInProgress[instanceID] + return ok +}