Skip to content

Commit

Permalink
[local] Track previous deletions when cache is refreshed
Browse files Browse the repository at this point in the history
  • Loading branch information
domenicbozzuto committed Jun 25, 2024
1 parent bf8049c commit 53ed40c
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,14 +545,14 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
klog.V(4).Infof("VMSS: orchestration Mode %s", orchestrationMode)

if orchestrationMode == compute.Uniform {
err := scaleSet.buildScaleSetCache(lastRefresh)
err := scaleSet.buildScaleSetCache(lastRefresh, scaleSet.instanceCache)
if err != nil {
return nil, err
}

} else if orchestrationMode == compute.Flexible {
if scaleSet.manager.config.EnableVmssFlex {
err := scaleSet.buildScaleSetCacheForFlex(lastRefresh)
err := scaleSet.buildScaleSetCacheForFlex(lastRefresh, scaleSet.instanceCache)
if err != nil {
return nil, err
}
Expand All @@ -568,7 +568,7 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
return scaleSet.instanceCache, nil
}

func (scaleSet *ScaleSet) buildScaleSetCache(lastRefresh time.Time) error {
func (scaleSet *ScaleSet) buildScaleSetCache(lastRefresh time.Time, lastInstanceCacheState []cloudprovider.Instance) error {
vms, rerr := scaleSet.GetScaleSetVms()
if rerr != nil {
if isAzureRequestsThrottled(rerr) {
Expand All @@ -580,13 +580,13 @@ func (scaleSet *ScaleSet) buildScaleSetCache(lastRefresh time.Time) error {
return rerr.Error()
}

scaleSet.instanceCache = buildInstanceCache(vms)
scaleSet.instanceCache = buildInstanceCache(vms, lastInstanceCacheState)
scaleSet.lastInstanceRefresh = lastRefresh

return nil
}

func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error {
func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time, lastInstanceCacheState []cloudprovider.Instance) error {
vms, rerr := scaleSet.GetFlexibleScaleSetVms()
if rerr != nil {
if isAzureRequestsThrottled(rerr) {
Expand All @@ -598,40 +598,49 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error
return rerr.Error()
}

scaleSet.instanceCache = buildInstanceCache(vms)
scaleSet.instanceCache = buildInstanceCache(vms, lastInstanceCacheState)
scaleSet.lastInstanceRefresh = lastRefresh

return nil
}

// Note that the GetScaleSetVms() results is not used directly because for the List endpoint,
// their resource ID format is not consistent with Get endpoint
func buildInstanceCache(vmList interface{}) []cloudprovider.Instance {
func buildInstanceCache(vmList interface{}, lastInstanceCacheState []cloudprovider.Instance) []cloudprovider.Instance {
instances := []cloudprovider.Instance{}

// Find the instances that are already being deleted, so that status is not
// lost when the cache is rebuilt
var instancesBeingDeleted []cloudprovider.Instance
for _, instance := range lastInstanceCacheState {
if instance.Status != nil && instance.Status.State == cloudprovider.InstanceDeleting {
instancesBeingDeleted = append(instancesBeingDeleted, instance)
}
}

switch vms := vmList.(type) {
case []compute.VirtualMachineScaleSetVM:
for _, vm := range vms {
powerState := vmPowerStateRunning
if vm.InstanceView != nil && vm.InstanceView.Statuses != nil {
powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses)
}
addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState)
addInstanceToCache(&instances, instancesBeingDeleted, vm.ID, vm.ProvisioningState, powerState)
}
case []compute.VirtualMachine:
for _, vm := range vms {
powerState := vmPowerStateRunning
if vm.InstanceView != nil && vm.InstanceView.Statuses != nil {
powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses)
}
addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState)
addInstanceToCache(&instances, instancesBeingDeleted, vm.ID, vm.ProvisioningState, powerState)
}
}

return instances
}

func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) {
func addInstanceToCache(instances *[]cloudprovider.Instance, instancesBeingDeleted []cloudprovider.Instance, id *string, provisioningState *string, powerState string) {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*id) == 0 {
return
Expand All @@ -644,10 +653,18 @@ func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisi
return
}

*instances = append(*instances, cloudprovider.Instance{
Id: "azure://" + resourceID,
Status: instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState),
})
// Ensure that instances that had the deleting state prior to the last cache refresh
// still maintain the deleting state
newId := "azure://" + resourceID
newStatus := instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState)
for _, instance := range instancesBeingDeleted {
if newId == instance.Id {
newStatus = &cloudprovider.InstanceStatus{State: cloudprovider.InstanceDeleting}
klog.V(4).Infof("VM %s was previously identified as undergoing deletion, persisting InstanceDeleting state", resourceID)
break
}
}
*instances = append(*instances, cloudprovider.Instance{Id: newId, Status: newStatus})
}

func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) {
Expand Down

0 comments on commit 53ed40c

Please sign in to comment.