Skip to content

Commit

Permalink
Merge pull request #6470 from gandhipr/prachigandhi/azure-getvmss-spot
Browse files Browse the repository at this point in the history
use getvmss api for spot instances in azure
  • Loading branch information
k8s-ci-robot authored Jul 19, 2024
2 parents 8dbe8ac + b69fd45 commit c8e4721
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 55 deletions.
15 changes: 15 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const (
rateLimitWriteQPSEnvVar = "RATE_LIMIT_WRITE_QPS"
rateLimitWriteBucketsEnvVar = "RATE_LIMIT_WRITE_BUCKETS"

// VmssSizeRefreshPeriodDefault in seconds
VmssSizeRefreshPeriodDefault = 30

// auth methods
authMethodPrincipal = "principal"
authMethodCLI = "cli"
Expand Down Expand Up @@ -128,6 +131,9 @@ type Config struct {
// Jitter in seconds subtracted from the VMSS cache TTL before the first refresh
VmssVmsCacheJitter int `json:"vmssVmsCacheJitter" yaml:"vmssVmsCacheJitter"`

// GetVmssSizeRefreshPeriod (seconds) defines how frequently to call GET VMSS API to fetch VMSS info per nodegroup instance
GetVmssSizeRefreshPeriod int `json:"getVmssSizeRefreshPeriod" yaml:"getVmssSizeRefreshPeriod"`

// number of latest deployments that will not be deleted
MaxDeploymentsCount int64 `json:"maxDeploymentsCount" yaml:"maxDeploymentsCount"`

Expand Down Expand Up @@ -256,6 +262,15 @@ func BuildAzureConfig(configReader io.Reader) (*Config, error) {
cfg.EnableDynamicInstanceList = dynamicInstanceListDefault
}

if getVmssSizeRefreshPeriod := os.Getenv("AZURE_GET_VMSS_SIZE_REFRESH_PERIOD"); getVmssSizeRefreshPeriod != "" {
cfg.GetVmssSizeRefreshPeriod, err = strconv.Atoi(getVmssSizeRefreshPeriod)
if err != nil {
return nil, fmt.Errorf("failed to parse AZURE_GET_VMSS_SIZE_REFRESH_PERIOD %q: %v", getVmssSizeRefreshPeriod, err)
}
} else {
cfg.GetVmssSizeRefreshPeriod = VmssSizeRefreshPeriodDefault
}

if enableVmssFlex := os.Getenv("AZURE_ENABLE_VMSS_FLEX"); enableVmssFlex != "" {
cfg.EnableVmssFlex, err = strconv.ParseBool(enableVmssFlex)
if err != nil {
Expand Down
39 changes: 25 additions & 14 deletions cluster-autoscaler/cloudprovider/azure/azure_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) {
VmssCacheTTL: 100,
VmssVmsCacheTTL: 110,
VmssVmsCacheJitter: 90,
GetVmssSizeRefreshPeriod: 30,
MaxDeploymentsCount: 8,
CloudProviderBackoff: true,
CloudProviderBackoffRetries: 1,
Expand Down Expand Up @@ -480,6 +481,14 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) {
assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err)
})

t.Run("invalid int for AZURE_GET_VMSS_SIZE_REFRESH_PERIOD", func(t *testing.T) {
t.Setenv("AZURE_GET_VMSS_SIZE_REFRESH_PERIOD", "invalidint")
manager, err := createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient)
expectedErr := fmt.Errorf("failed to parse AZURE_GET_VMSS_SIZE_REFRESH_PERIOD \"invalidint\": strconv.Atoi: parsing \"invalidint\": invalid syntax")
assert.Nil(t, manager)
assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err)
})

t.Run("invalid int for AZURE_MAX_DEPLOYMENT_COUNT", func(t *testing.T) {
t.Setenv("AZURE_MAX_DEPLOYMENT_COUNT", "invalidint")
manager, err := createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient)
Expand Down Expand Up @@ -685,13 +694,14 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) {
azureRef: azureRef{
Name: vmssName,
},
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second,
}}
assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs)
}
Expand Down Expand Up @@ -732,13 +742,14 @@ func TestGetFilteredAutoscalingGroupsVmssWithConfiguredSizes(t *testing.T) {
azureRef: azureRef{
Name: vmssName,
},
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second,
}}
assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs)
}
Expand Down
70 changes: 59 additions & 11 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,29 @@ type ScaleSet struct {
minSize int
maxSize int

enableForceDelete bool

sizeMutex sync.Mutex
curSize int64

enableForceDelete bool
enableDynamicInstanceList bool

lastSizeRefresh time.Time
// curSize tracks (and caches) the number of VMs in this ScaleSet.
// It is periodically updated from vmss.Sku.Capacity, with VMSS itself coming
// either from azure.Cache (which periodically does VMSS.List)
// or from direct VMSS.Get (used for Spot).
curSize int64
// lastSizeRefresh is the time curSize was last refreshed from vmss.Sku.Capacity.
// Together with sizeRefreshPeriod, it is used to determine if it is time to refresh curSize.
lastSizeRefresh time.Time
// sizeRefreshPeriod is how often curSize is refreshed from vmss.Sku.Capacity.
// (Set from azureCache.refreshInterval = VmssCacheTTL or [defaultMetadataCache]refreshInterval = 1min)
sizeRefreshPeriod time.Duration
// getVmssSizeRefreshPeriod is how often curSize should be refreshed in case VMSS.Get call is used (only spot instances).
// (Set from GetVmssSizeRefreshPeriod, if specified = get-vmss-size-refresh-period = 30s,
// or override from autoscalerProfile.GetVmssSizeRefreshPeriod)
getVmssSizeRefreshPeriod time.Duration

instancesRefreshPeriod time.Duration
instancesRefreshJitter int

sizeMutex sync.Mutex
instanceMutex sync.Mutex
instanceCache []cloudprovider.Instance
lastInstanceRefresh time.Time
Expand Down Expand Up @@ -98,6 +108,12 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) (
scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod
}

if az.config.GetVmssSizeRefreshPeriod != 0 {
scaleSet.getVmssSizeRefreshPeriod = time.Duration(az.config.GetVmssSizeRefreshPeriod) * time.Second
} else {
scaleSet.getVmssSizeRefreshPeriod = time.Duration(VmssSizeRefreshPeriodDefault) * time.Second
}

return scaleSet, nil
}

Expand Down Expand Up @@ -157,17 +173,43 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) {
scaleSet.sizeMutex.Lock()
defer scaleSet.sizeMutex.Unlock()

if scaleSet.lastSizeRefresh.Add(scaleSet.sizeRefreshPeriod).After(time.Now()) {
klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize)
return scaleSet.curSize, nil
}

set, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err)
return -1, err
}

effectiveSizeRefreshPeriod := scaleSet.sizeRefreshPeriod

// If the scale set is Spot, we want to have a more fresh view of the Sku.Capacity field.
// This is because evictions can happen at any given point in time,
// even before VMs are materialized as nodes. We should be able to
// react to those and have the autoscaler readjust the goal again to force restoration.
// Taking into account only if orchestrationMode == Uniform because flex mode can have
// combination of spot and regular vms
if isSpot(&set) {
effectiveSizeRefreshPeriod = scaleSet.getVmssSizeRefreshPeriod
}

if scaleSet.lastSizeRefresh.Add(effectiveSizeRefreshPeriod).After(time.Now()) {
klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize)
return scaleSet.curSize, nil
}

// If the scale set is on Spot, make a GET VMSS call to fetch more updated fresh info
if isSpot(&set) {
ctx, cancel := getContextWithCancel()
defer cancel()

var rerr *retry.Error
set, rerr = scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(ctx, scaleSet.manager.config.ResourceGroup,
scaleSet.Name)
if rerr != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, rerr)
return -1, err
}
}

vmssSizeMutex.Lock()
curSize := *set.Sku.Capacity
vmssSizeMutex.Unlock()
Expand All @@ -184,6 +226,12 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) {
return scaleSet.curSize, nil
}

func isSpot(vmss *compute.VirtualMachineScaleSet) bool {
return vmss != nil && vmss.VirtualMachineScaleSetProperties != nil &&
vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile != nil &&
vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile.Priority == compute.Spot
}

// GetScaleSetSize gets Scale Set size.
func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) {
return scaleSet.getCurSize()
Expand Down
63 changes: 33 additions & 30 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,44 +168,47 @@ func TestTargetSize(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

orchestrationModes := [2]compute.OrchestrationMode{compute.Uniform, compute.Flexible}

expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", compute.Uniform)
expectedVMSSVMs := newTestVMSSVMList(3)
expectedVMs := newTestVMList(3)
spotScaleSet := newTestVMSSList(5, "spot-vmss", "eastus", compute.Uniform)[0]
spotScaleSet.VirtualMachineProfile = &compute.VirtualMachineScaleSetVMProfile{
Priority: compute.Spot,
}
expectedScaleSets = append(expectedScaleSets, spotScaleSet)

for _, orchMode := range orchestrationModes {
provider := newTestProvider(t)
mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient
provider := newTestProvider(t)
mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient

if orchMode == compute.Uniform {
err := provider.azureManager.forceRefresh()
assert.NoError(t, err)

mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl)
mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient
// non-spot nodepool
registered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "test-asg"))
assert.True(t, registered)
assert.Equal(t, len(provider.NodeGroups()), 1)

} else {
provider.azureManager.config.EnableVmssFlex = true
mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes()
}
targetSize, err := provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)

err := provider.azureManager.forceRefresh()
assert.NoError(t, err)
// Register a spot nodepool
spotregistered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "spot-vmss"))
assert.True(t, spotregistered)
assert.Equal(t, len(provider.NodeGroups()), 2)

registered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "test-asg"))
assert.True(t, registered)
assert.Equal(t, len(provider.NodeGroups()), 1)
// mock getvmss call for spotnode pool returning different capacity
spotScaleSet.Sku.Capacity = to.Int64Ptr(1)
mockVMSSClient.EXPECT().Get(gomock.Any(), provider.azureManager.config.ResourceGroup, "spot-vmss").Return(spotScaleSet, nil).Times(1)

targetSize, err := provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)
}
targetSize, err = provider.NodeGroups()[1].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, targetSize)
}

func TestIncreaseSize(t *testing.T) {
Expand Down

0 comments on commit c8e4721

Please sign in to comment.