Skip to content

Commit

Permalink
[local] Add support for doing refresh before IncreaseSize
Browse files Browse the repository at this point in the history
  • Loading branch information
domenicbozzuto committed Jun 25, 2024
1 parent bf8049c commit 15f6d76
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 7 deletions.
8 changes: 5 additions & 3 deletions cluster-autoscaler/cloudprovider/azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,12 @@ When using K8s versions older than v1.18, we recommend using at least **v.1.17.5
As for CA versions older than 1.18, we recommend using at least **v.1.17.2, v1.16.5, v1.15.6**.

In addition, cluster-autoscaler exposes a `AZURE_VMSS_CACHE_TTL` environment variable which controls the rate of `GetVMScaleSet` being made. By default, this is 15 seconds but setting this to a higher value such as 60 seconds can protect against API throttling. The caches used are proactively incremented and decremented with the scale up and down operations and this higher value doesn't have any noticeable impact on performance. **Note that the value is in seconds**
`AZURE_VMSS_CACHE_FORCE_REFRESH` can also be set to always refresh the cache before an upscale.

| Config Name | Default | Environment Variable | Cloud Config File |
| ----------- | ------- | -------------------- | ----------------- |
| VmssCacheTTL | 60 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL |
| Config Name | Default | Environment Variable | Cloud Config File |
|-----------------------|--------|--------------------------------|------------------------|
| VmssCacheTTL | 60 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL |
| VmssCacheForceRefresh | false | AZURE_VMSS_CACHE_FORCE_REFRESH | vmssCacheForceRefresh |

The `AZURE_VMSS_VMS_CACHE_TTL` environment variable affects the `GetScaleSetVms` (VMSS VM List) calls rate. The default value is 300 seconds.
A configurable jitter (`AZURE_VMSS_VMS_CACHE_JITTER` environment variable, default 0) expresses the maximum number of second that will be subtracted from that initial VMSS cache TTL after a new VMSS is discovered by the cluster-autoscaler: this can prevent a dogpile effect on clusters having many VMSS.
Expand Down
10 changes: 10 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type Config struct {
// VMSS metadata cache TTL in seconds, only applies for vmss type
VmssCacheTTL int64 `json:"vmssCacheTTL" yaml:"vmssCacheTTL"`

// VMSS cache option to force refresh of the vmss capacity before an upscale
VmssCacheForceRefresh bool `json:"vmssCacheForceRefresh" yaml:"vmssCacheForceRefresh"`

// VMSS instances cache TTL in seconds, only applies for vmss type
VmssVmsCacheTTL int64 `json:"vmssVmsCacheTTL" yaml:"vmssVmsCacheTTL"`

Expand Down Expand Up @@ -215,6 +218,13 @@ func BuildAzureConfig(configReader io.Reader) (*Config, error) {
}
}

if vmssCacheForceRefresh := os.Getenv("AZURE_VMSS_CACHE_FORCE_REFRESH"); vmssCacheForceRefresh != "" {
cfg.VmssCacheForceRefresh, err = strconv.ParseBool(vmssCacheForceRefresh)
if err != nil {
return nil, fmt.Errorf("failed to parse AZURE_VMSS_CACHE_FORCE_REFRESH %q: %v", vmssCacheForceRefresh, err)
}
}

if vmssVmsCacheTTL := os.Getenv("AZURE_VMSS_VMS_CACHE_TTL"); vmssVmsCacheTTL != "" {
cfg.VmssVmsCacheTTL, err = strconv.ParseInt(vmssVmsCacheTTL, 10, 0)
if err != nil {
Expand Down
27 changes: 25 additions & 2 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,22 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error {
return err
}

// Update the new capacity to cache.
// Update the new capacity to cache, if the size increase is a net increase
vmssSizeMutex.Lock()
vmssInfo.Sku.Capacity = &size
rejectIncrease := false
lastCapacity := *vmssInfo.Sku.Capacity
if size < lastCapacity {
rejectIncrease = true
} else {
vmssInfo.Sku.Capacity = &size
}
vmssSizeMutex.Unlock()

if rejectIncrease {
klog.Warningf("VMSS %s: rejecting size decrease from %d to %d", scaleSet.Name, lastCapacity, size)
return fmt.Errorf("vmss %s: rejected size decrease from %d to %d", scaleSet.Name, lastCapacity, size)
}

// Compose a new VMSS for updating.
op := compute.VirtualMachineScaleSet{
Name: vmssInfo.Name,
Expand All @@ -254,6 +265,7 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error {
// Proactively set the VMSS size so autoscaler makes better decisions.
scaleSet.curSize = size
scaleSet.lastSizeRefresh = time.Now()
klog.V(3).Infof("VMSS %s: requested size increase from %d to %d", scaleSet.Name, lastCapacity, size)

go scaleSet.updateVMSSCapacity(future)
return nil
Expand All @@ -272,6 +284,15 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
return fmt.Errorf("size increase must be positive")
}

if scaleSet.manager.config.VmssCacheForceRefresh {
if err := scaleSet.manager.forceRefresh(); err != nil {
klog.Errorf("VMSS %s: force refresh of VMSSs failed: %v", scaleSet.Name, err)
return err
}
klog.V(4).Infof("VMSS: %s, forced refreshed before checking size", scaleSet.Name)
scaleSet.invalidateLastSizeRefreshWithLock()
}

size, err := scaleSet.GetScaleSetSize()
if err != nil {
return err
Expand Down Expand Up @@ -437,8 +458,10 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered
if !hasUnregisteredNodes {
scaleSet.sizeMutex.Lock()
scaleSet.curSize -= int64(len(instanceIDs))
curSize := scaleSet.curSize
scaleSet.lastSizeRefresh = time.Now()
scaleSet.sizeMutex.Unlock()
klog.V(3).Infof("VMSS %s: had unregistered nodes, current size decremented by %d to %d", scaleSet.Name, len(instanceIDs), curSize)
}

// Proactively set the status of the instances to be deleted in cache
Expand Down
80 changes: 78 additions & 2 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func TestIncreaseSize(t *testing.T) {

provider := newTestProvider(t)
expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", orchMode)
provider.azureManager.explicitlyConfigured["test-asg"] = true

mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
Expand Down Expand Up @@ -263,7 +264,7 @@ func TestIncreaseSizeOnVMProvisioningFailed(t *testing.T) {
}

mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil)
mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), manager.config.ResourceGroup, vmssName, gomock.Any()).Return(nil, nil)
mockVMSSClient.EXPECT().WaitForCreateOrUpdateResult(gomock.Any(), gomock.Any(), manager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes()
manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient
Expand Down Expand Up @@ -322,7 +323,7 @@ func TestIncreaseSizeOnVMSSUpdating(t *testing.T) {
expectedVMSSVMs := newTestVMSSVMList(3)

mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil)
mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), manager.config.ResourceGroup, vmssName, gomock.Any()).Return(nil, nil)
mockVMSSClient.EXPECT().WaitForCreateOrUpdateResult(gomock.Any(), gomock.Any(), manager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes()
manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient
Expand All @@ -344,6 +345,81 @@ func TestIncreaseSizeOnVMSSUpdating(t *testing.T) {
assert.NoError(t, err)
}

func TestIncreaseSizeWithForceRefresh(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

orchestrationModes := [2]compute.OrchestrationMode{compute.Uniform, compute.Flexible}

expectedVMSSVMs := newTestVMSSVMList(3)
expectedVMs := newTestVMList(3)

for _, orchMode := range orchestrationModes {

provider := newTestProvider(t)
expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", orchMode)
provider.azureManager.explicitlyConfigured["test-asg"] = true
provider.azureManager.config.VmssCacheForceRefresh = true

mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).Times(2)
mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(nil, nil)
mockVMSSClient.EXPECT().WaitForCreateOrUpdateResult(gomock.Any(), gomock.Any(), provider.azureManager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient

if orchMode == compute.Uniform {
mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl)
mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient
} else {

provider.azureManager.config.EnableVmssFlex = true
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient
}
err := provider.azureManager.forceRefresh()
assert.NoError(t, err)

ss := newTestScaleSet(provider.azureManager, "test-asg-doesnt-exist")
err = ss.IncreaseSize(100)
expectedErr := fmt.Errorf("could not find vmss: test-asg-doesnt-exist")
assert.Equal(t, expectedErr, err)

registered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "test-asg"))
assert.True(t, registered)
assert.Equal(t, len(provider.NodeGroups()), 1)

// current target size is 2.
targetSize, err := provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)

// Simulate missing instances
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(newTestVMSSList(1, "test-asg", "eastus", orchMode), nil).AnyTimes()
if orchMode == compute.Uniform {
mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl)
mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(newTestVMSSVMList(1), nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient
} else {
provider.azureManager.config.EnableVmssFlex = true
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(newTestVMList(1), nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient
}

// Update cache on IncreaseSize and increase back to 3 nodes.
err = provider.NodeGroups()[0].IncreaseSize(2)
assert.NoError(t, err)

// new target size should be 3.
targetSize, err = provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)
}
}

func TestBelongs(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit 15f6d76

Please sign in to comment.