diff --git a/cluster-autoscaler/cloudprovider/azure/azure_config.go b/cluster-autoscaler/cloudprovider/azure/azure_config.go index b2f880d29af3..096d95e2f4e0 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_config.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_config.go @@ -56,6 +56,9 @@ const ( rateLimitWriteQPSEnvVar = "RATE_LIMIT_WRITE_QPS" rateLimitWriteBucketsEnvVar = "RATE_LIMIT_WRITE_BUCKETS" + // VmssSizeRefreshPeriodDefault in seconds + VmssSizeRefreshPeriodDefault = 30 + // auth methods authMethodPrincipal = "principal" authMethodCLI = "cli" @@ -128,6 +131,9 @@ type Config struct { // Jitter in seconds subtracted from the VMSS cache TTL before the first refresh VmssVmsCacheJitter int `json:"vmssVmsCacheJitter" yaml:"vmssVmsCacheJitter"` + // GetVmssSizeRefreshPeriod (seconds) defines how frequently to call GET VMSS API to fetch VMSS info per nodegroup instance + GetVmssSizeRefreshPeriod int `json:"getVmssSizeRefreshPeriod" yaml:"getVmssSizeRefreshPeriod"` + // number of latest deployments that will not be deleted MaxDeploymentsCount int64 `json:"maxDeploymentsCount" yaml:"maxDeploymentsCount"` @@ -256,6 +262,15 @@ func BuildAzureConfig(configReader io.Reader) (*Config, error) { cfg.EnableDynamicInstanceList = dynamicInstanceListDefault } + if getVmssSizeRefreshPeriod := os.Getenv("AZURE_GET_VMSS_SIZE_REFRESH_PERIOD"); getVmssSizeRefreshPeriod != "" { + cfg.GetVmssSizeRefreshPeriod, err = strconv.Atoi(getVmssSizeRefreshPeriod) + if err != nil { + return nil, fmt.Errorf("failed to parse AZURE_GET_VMSS_SIZE_REFRESH_PERIOD %q: %v", getVmssSizeRefreshPeriod, err) + } + } else { + cfg.GetVmssSizeRefreshPeriod = VmssSizeRefreshPeriodDefault + } + if enableVmssFlex := os.Getenv("AZURE_ENABLE_VMSS_FLEX"); enableVmssFlex != "" { cfg.EnableVmssFlex, err = strconv.ParseBool(enableVmssFlex) if err != nil { diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index cd41786bd3d4..eb5eda27a8ac 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -366,6 +366,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { VmssCacheTTL: 100, VmssVmsCacheTTL: 110, VmssVmsCacheJitter: 90, + GetVmssSizeRefreshPeriod: 30, MaxDeploymentsCount: 8, CloudProviderBackoff: true, CloudProviderBackoffRetries: 1, @@ -480,6 +481,14 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) }) + t.Run("invalid int for AZURE_GET_VMSS_SIZE_REFRESH_PERIOD", func(t *testing.T) { + t.Setenv("AZURE_GET_VMSS_SIZE_REFRESH_PERIOD", "invalidint") + manager, err := createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) + expectedErr := fmt.Errorf("failed to parse AZURE_GET_VMSS_SIZE_REFRESH_PERIOD \"invalidint\": strconv.Atoi: parsing \"invalidint\": invalid syntax") + assert.Nil(t, manager) + assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) + }) + t.Run("invalid int for AZURE_MAX_DEPLOYMENT_COUNT", func(t *testing.T) { t.Setenv("AZURE_MAX_DEPLOYMENT_COUNT", "invalidint") manager, err := createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) @@ -685,13 +694,14 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { azureRef: azureRef{ Name: vmssName, }, - minSize: minVal, - maxSize: maxVal, - manager: manager, - enableForceDelete: manager.config.EnableForceDelete, - curSize: 3, - sizeRefreshPeriod: manager.azureCache.refreshInterval, - instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + minSize: minVal, + maxSize: maxVal, + manager: manager, + enableForceDelete: manager.config.EnableForceDelete, + curSize: 3, + sizeRefreshPeriod: manager.azureCache.refreshInterval, + instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } @@ -732,13 +742,14 @@ func TestGetFilteredAutoscalingGroupsVmssWithConfiguredSizes(t *testing.T) { azureRef: azureRef{ Name: vmssName, }, - minSize: minVal, - maxSize: maxVal, - manager: manager, - enableForceDelete: manager.config.EnableForceDelete, - curSize: 3, - sizeRefreshPeriod: manager.azureCache.refreshInterval, - instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + minSize: minVal, + maxSize: maxVal, + manager: manager, + enableForceDelete: manager.config.EnableForceDelete, + curSize: 3, + sizeRefreshPeriod: manager.azureCache.refreshInterval, + instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index e7c6079c2206..a506d54df869 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -58,19 +58,29 @@ type ScaleSet struct { minSize int maxSize int - enableForceDelete bool - - sizeMutex sync.Mutex - curSize int64 - + enableForceDelete bool enableDynamicInstanceList bool - lastSizeRefresh time.Time + // curSize tracks (and caches) the number of VMs in this ScaleSet. + // It is periodically updated from vmss.Sku.Capacity, with VMSS itself coming + // either from azure.Cache (which periodically does VMSS.List) + // or from direct VMSS.Get (used for Spot). + curSize int64 + // lastSizeRefresh is the time curSize was last refreshed from vmss.Sku.Capacity. + // Together with sizeRefreshPeriod, it is used to determine if it is time to refresh curSize. + lastSizeRefresh time.Time + // sizeRefreshPeriod is how often curSize is refreshed from vmss.Sku.Capacity. + // (Set from azureCache.refreshInterval = VmssCacheTTL or [defaultMetadataCache]refreshInterval = 1min) sizeRefreshPeriod time.Duration + // getVmssSizeRefreshPeriod is how often curSize should be refreshed in case VMSS.Get call is used (only spot instances). + // (Set from GetVmssSizeRefreshPeriod, if specified = get-vmss-size-refresh-period = 30s, + // or override from autoscalerProfile.GetVmssSizeRefreshPeriod) + getVmssSizeRefreshPeriod time.Duration instancesRefreshPeriod time.Duration instancesRefreshJitter int + sizeMutex sync.Mutex instanceMutex sync.Mutex instanceCache []cloudprovider.Instance lastInstanceRefresh time.Time @@ -98,6 +108,12 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) ( scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod } + if az.config.GetVmssSizeRefreshPeriod != 0 { + scaleSet.getVmssSizeRefreshPeriod = time.Duration(az.config.GetVmssSizeRefreshPeriod) * time.Second + } else { + scaleSet.getVmssSizeRefreshPeriod = time.Duration(VmssSizeRefreshPeriodDefault) * time.Second + } + return scaleSet, nil } @@ -157,17 +173,43 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { scaleSet.sizeMutex.Lock() defer scaleSet.sizeMutex.Unlock() - if scaleSet.lastSizeRefresh.Add(scaleSet.sizeRefreshPeriod).After(time.Now()) { - klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize) - return scaleSet.curSize, nil - } - set, err := scaleSet.getVMSSFromCache() if err != nil { klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err) return -1, err } + effectiveSizeRefreshPeriod := scaleSet.sizeRefreshPeriod + + // If the scale set is Spot, we want to have a more fresh view of the Sku.Capacity field. + // This is because evictions can happen at any given point in time, + // even before VMs are materialized as nodes. We should be able to + // react to those and have the autoscaler readjust the goal again to force restoration. + // Taking into account only if orchestrationMode == Uniform because flex mode can have + // combination of spot and regular vms + if isSpot(&set) { + effectiveSizeRefreshPeriod = scaleSet.getVmssSizeRefreshPeriod + } + + if scaleSet.lastSizeRefresh.Add(effectiveSizeRefreshPeriod).After(time.Now()) { + klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize) + return scaleSet.curSize, nil + } + + // If the scale set is on Spot, make a GET VMSS call to fetch more updated fresh info + if isSpot(&set) { + ctx, cancel := getContextWithCancel() + defer cancel() + + var rerr *retry.Error + set, rerr = scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(ctx, scaleSet.manager.config.ResourceGroup, + scaleSet.Name) + if rerr != nil { + klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, rerr) + return -1, err + } + } + vmssSizeMutex.Lock() curSize := *set.Sku.Capacity vmssSizeMutex.Unlock() @@ -184,6 +226,12 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { return scaleSet.curSize, nil } +func isSpot(vmss *compute.VirtualMachineScaleSet) bool { + return vmss != nil && vmss.VirtualMachineScaleSetProperties != nil && + vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile != nil && + vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile.Priority == compute.Spot +} + // GetScaleSetSize gets Scale Set size. func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { return scaleSet.getCurSize() diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index 3ac03343d903..c9ba730852f9 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -168,44 +168,47 @@ func TestTargetSize(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - orchestrationModes := [2]compute.OrchestrationMode{compute.Uniform, compute.Flexible} - expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", compute.Uniform) - expectedVMSSVMs := newTestVMSSVMList(3) - expectedVMs := newTestVMList(3) + spotScaleSet := newTestVMSSList(5, "spot-vmss", "eastus", compute.Uniform)[0] + spotScaleSet.VirtualMachineProfile = &compute.VirtualMachineScaleSetVMProfile{ + Priority: compute.Spot, + } + expectedScaleSets = append(expectedScaleSets, spotScaleSet) - for _, orchMode := range orchestrationModes { - provider := newTestProvider(t) - mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) - mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() - provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient - mockVMClient := mockvmclient.NewMockInterface(ctrl) - mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes() - provider.azureManager.azClient.virtualMachinesClient = mockVMClient + provider := newTestProvider(t) + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) + mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() + provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient + mockVMClient := mockvmclient.NewMockInterface(ctrl) + mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes() + provider.azureManager.azClient.virtualMachinesClient = mockVMClient - if orchMode == compute.Uniform { + err := provider.azureManager.forceRefresh() + assert.NoError(t, err) - mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) - mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() - provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + // non-spot nodepool + registered := provider.azureManager.RegisterNodeGroup( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) - } else { - provider.azureManager.config.EnableVmssFlex = true - mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes() - } + targetSize, err := provider.NodeGroups()[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 3, targetSize) - err := provider.azureManager.forceRefresh() - assert.NoError(t, err) + // Register a spot nodepool + spotregistered := provider.azureManager.RegisterNodeGroup( + newTestScaleSet(provider.azureManager, "spot-vmss")) + assert.True(t, spotregistered) + assert.Equal(t, len(provider.NodeGroups()), 2) - registered := provider.azureManager.RegisterNodeGroup( - newTestScaleSet(provider.azureManager, "test-asg")) - assert.True(t, registered) - assert.Equal(t, len(provider.NodeGroups()), 1) + // mock getvmss call for spotnode pool returning different capacity + spotScaleSet.Sku.Capacity = to.Int64Ptr(1) + mockVMSSClient.EXPECT().Get(gomock.Any(), provider.azureManager.config.ResourceGroup, "spot-vmss").Return(spotScaleSet, nil).Times(1) - targetSize, err := provider.NodeGroups()[0].TargetSize() - assert.NoError(t, err) - assert.Equal(t, 3, targetSize) - } + targetSize, err = provider.NodeGroups()[1].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 1, targetSize) } func TestIncreaseSize(t *testing.T) {