From 8a32f1eb3836918288f3864b4b2b58410a39cd29 Mon Sep 17 00:00:00 2001 From: Jack Francis Date: Tue, 17 Oct 2023 13:00:20 -0700 Subject: [PATCH] azure: enable vmss deallocate Signed-off-by: Jack Francis --- charts/cluster-autoscaler/Chart.yaml | 2 +- charts/cluster-autoscaler/README.md | 1 + .../templates/deployment.yaml | 5 + .../cluster-autoscaler/templates/secret.yaml | 1 + charts/cluster-autoscaler/values.yaml | 4 + .../cloudprovider/azure/azure_config.go | 4 + .../cloudprovider/azure/azure_error.go | 204 +++++++ .../cloudprovider/azure/azure_logger.go | 109 ++++ .../cloudprovider/azure/azure_manager.go | 2 + .../cloudprovider/azure/azure_manager_test.go | 1 + .../cloudprovider/azure/azure_scale_set.go | 546 +++++++++++++++++- .../azure/azure_scale_set_test.go | 9 +- .../examples/cluster-autoscaler-aks.yaml | 5 + 13 files changed, 857 insertions(+), 36 deletions(-) create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_error.go create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_logger.go diff --git a/charts/cluster-autoscaler/Chart.yaml b/charts/cluster-autoscaler/Chart.yaml index f4c4704de284..2438a226ff14 100644 --- a/charts/cluster-autoscaler/Chart.yaml +++ b/charts/cluster-autoscaler/Chart.yaml @@ -11,4 +11,4 @@ name: cluster-autoscaler sources: - https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler type: application -version: 9.34.0 +version: 9.35.0 diff --git a/charts/cluster-autoscaler/README.md b/charts/cluster-autoscaler/README.md index 2a3bba3fad9e..8ac256071458 100644 --- a/charts/cluster-autoscaler/README.md +++ b/charts/cluster-autoscaler/README.md @@ -383,6 +383,7 @@ vpa: | azureClientID | string | `""` | Service Principal ClientID with contributor permission to Cluster and Node ResourceGroup. Required if `cloudProvider=azure` | | azureClientSecret | string | `""` | Service Principal ClientSecret with contributor permission to Cluster and Node ResourceGroup. Required if `cloudProvider=azure` | | azureResourceGroup | string | `""` | Azure resource group that the cluster is located. Required if `cloudProvider=azure` | +| azureScaleDownPolicy | string | `"Delete"` | Azure ScaleDownPolicy, either "Delete" (default) or "Deallocate" Only relevant if `cloudProvider=azure` | | azureSubscriptionID | string | `""` | Azure subscription where the resources are located. Required if `cloudProvider=azure` | | azureTenantID | string | `""` | Azure tenant where the resources are located. Required if `cloudProvider=azure` | | azureUseManagedIdentityExtension | bool | `false` | Whether to use Azure's managed identity extension for credentials. If using MSI, ensure subscription ID, resource group, and azure AKS cluster name are set. You can only use one authentication method at a time, either azureUseWorkloadIdentityExtension or azureUseManagedIdentityExtension should be set. | diff --git a/charts/cluster-autoscaler/templates/deployment.yaml b/charts/cluster-autoscaler/templates/deployment.yaml index 113d92971d2a..39c26aa50311 100644 --- a/charts/cluster-autoscaler/templates/deployment.yaml +++ b/charts/cluster-autoscaler/templates/deployment.yaml @@ -198,6 +198,11 @@ spec: secretKeyRef: key: NodeResourceGroup name: {{ default (include "cluster-autoscaler.fullname" .) .Values.secretKeyRefNameOverride }} + - name: AZURE_SCALE_DOWN_POLICY + valueFrom: + secretKeyRef: + key: ScaleDownPolicy + name: {{ default (include "cluster-autoscaler.fullname" .) .Values.secretKeyRefNameOverride }} {{- end }} {{- else if eq .Values.cloudProvider "exoscale" }} - name: EXOSCALE_API_KEY diff --git a/charts/cluster-autoscaler/templates/secret.yaml b/charts/cluster-autoscaler/templates/secret.yaml index 4890230090c1..f4d6e5dab668 100644 --- a/charts/cluster-autoscaler/templates/secret.yaml +++ b/charts/cluster-autoscaler/templates/secret.yaml @@ -17,6 +17,7 @@ data: SubscriptionID: "{{ .Values.azureSubscriptionID | b64enc }}" TenantID: "{{ .Values.azureTenantID | b64enc }}" VMType: "{{ .Values.azureVMType | b64enc }}" + ScaleDownPolicy: "{{ .Values.azureScaleDownPolicy | b64enc }}" {{- else if $isAws }} AwsAccessKeyId: "{{ .Values.awsAccessKeyID | b64enc }}" AwsSecretAccessKey: "{{ .Values.awsSecretAccessKey | b64enc }}" diff --git a/charts/cluster-autoscaler/values.yaml b/charts/cluster-autoscaler/values.yaml index cc1cd6633a4b..cb937db6e9f1 100644 --- a/charts/cluster-autoscaler/values.yaml +++ b/charts/cluster-autoscaler/values.yaml @@ -96,6 +96,10 @@ azureUseWorkloadIdentityExtension: false # azureVMType -- Azure VM type. azureVMType: "AKS" +# azureScaleDownPolicy -- Azure ScaleDownPolicy, either "Delete" (default) or "Deallocate" +# Only relevant if `cloudProvider=azure` +azureScaleDownPolicy: "Delete" + # cloudConfigPath -- Configuration file for cloud provider. cloudConfigPath: "" diff --git a/cluster-autoscaler/cloudprovider/azure/azure_config.go b/cluster-autoscaler/cloudprovider/azure/azure_config.go index 9fe559d07e14..3e36393f4d9c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_config.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_config.go @@ -135,6 +135,9 @@ type Config struct { // EnableVmssFlex defines whether to enable Vmss Flex support or not EnableVmssFlex bool `json:"enableVmssFlex,omitempty" yaml:"enableVmssFlex,omitempty"` + + // ScaleDownPolicy is the VMSS scale down policy, either "Delete" or "Deallocate" + ScaleDownPolicy string `json:"scaleDownPolicy" yaml:"scaleDownPolicy"` } // BuildAzureConfig returns a Config object for the Azure clients @@ -169,6 +172,7 @@ func BuildAzureConfig(configReader io.Reader) (*Config, error) { cfg.AADClientCertPath = os.Getenv("ARM_CLIENT_CERT_PATH") cfg.AADClientCertPassword = os.Getenv("ARM_CLIENT_CERT_PASSWORD") cfg.Deployment = os.Getenv("ARM_DEPLOYMENT") + cfg.ScaleDownPolicy = os.Getenv("AZURE_SCALE_DOWN_POLICY") subscriptionID, err := getSubscriptionIdFromInstanceMetadata() if err != nil { diff --git a/cluster-autoscaler/cloudprovider/azure/azure_error.go b/cluster-autoscaler/cloudprovider/azure/azure_error.go new file mode 100644 index 000000000000..7499b67444f0 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_error.go @@ -0,0 +1,204 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/Azure/go-autorest/autorest/azure" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" +) + +// Unknown is for errors that have nil RawError body +const Unknown CloudProviderErrorReason = "Unknown" + +// Errors on the sync path +const ( + // QuotaExceeded falls under OperationNotAllowed error code but we make it more specific here + QuotaExceeded CloudProviderErrorReason = "QuotaExceeded" + // OperationNotAllowed is an umbrella for a lot of errors returned by Azure + OperationNotAllowed string = "OperationNotAllowed" +) + +// AutoscalerErrorType describes a high-level category of a given error +type AutoscalerErrorType string + +// AutoscalerErrorReason is a more detailed reason for the failed operation +type AutoscalerErrorReason string + +// CloudProviderErrorReason providers more details on errors of type CloudProviderError +type CloudProviderErrorReason AutoscalerErrorReason + +// AutoscalerError contains information about Autoscaler errors +type AutoscalerError interface { + // Error implements golang error interface + Error() string + + // Type returns the type of AutoscalerError + Type() AutoscalerErrorType + + // Reason returns the reason of the AutoscalerError + Reason() AutoscalerErrorReason + + // AddPrefix adds a prefix to error message. + // Returns the error it's called for convenient inline use. + // Example: + // if err := DoSomething(myObject); err != nil { + // return err.AddPrefix("can't do something with %v: ", myObject) + // } + AddPrefix(msg string, args ...interface{}) AutoscalerError +} + +type autoscalerErrorImpl struct { + errorType AutoscalerErrorType + errorReason AutoscalerErrorReason + msg string +} + +const ( + // CloudProviderError is an error related to underlying infrastructure + CloudProviderError AutoscalerErrorType = "CloudProviderError" + // ApiCallError is an error related to communication with k8s API server + ApiCallError AutoscalerErrorType = "ApiCallError" + // Timeout is an error related to nodes not joining the cluster in maxNodeProvisionTime + Timeout AutoscalerErrorType = "Timeout" + // InternalError is an error inside Cluster Autoscaler + InternalError AutoscalerErrorType = "InternalError" + // TransientError is an error that causes us to skip a single loop, but + // does not require any additional action. + TransientError AutoscalerErrorType = "TransientError" + // ConfigurationError is an error related to bad configuration provided + // by a user. + ConfigurationError AutoscalerErrorType = "ConfigurationError" + // NodeGroupDoesNotExistError signifies that a NodeGroup + // does not exist. + NodeGroupDoesNotExistError AutoscalerErrorType = "nodeGroupDoesNotExistError" +) + +const ( + // NodeRegistration signifies an error with node registering + NodeRegistration AutoscalerErrorReason = "NodeRegistration" +) + +// NewAutoscalerError returns new autoscaler error with a message constructed from format string +func NewAutoscalerError(errorType AutoscalerErrorType, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: errorType, + msg: fmt.Sprintf(msg, args...), + } +} + +// NewAutoscalerErrorWithReason returns new autoscaler error with a reason and a message constructed from format string +func NewAutoscalerErrorWithReason(errorType AutoscalerErrorType, reason AutoscalerErrorReason, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: errorType, + errorReason: reason, + msg: fmt.Sprintf(msg, args...), + } +} + +// NewAutoscalerCloudProviderError returns new autoscaler error with a cloudprovider error type and a message constructed from format string +func NewAutoscalerCloudProviderError(errorReason CloudProviderErrorReason, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: CloudProviderError, + errorReason: AutoscalerErrorReason(errorReason), + msg: fmt.Sprintf(msg, args...), + } +} + +// ToAutoscalerError converts an error to AutoscalerError with given type, +// unless it already is an AutoscalerError (in which case it's not modified). +func ToAutoscalerError(defaultType AutoscalerErrorType, err error) AutoscalerError { + if err == nil { + return nil + } + if e, ok := err.(AutoscalerError); ok { + return e + } + return NewAutoscalerError(defaultType, "%v", err) +} + +// Error implements golang error interface +func (e autoscalerErrorImpl) Error() string { + return e.msg +} + +// Type returns the type of AutoscalerError +func (e autoscalerErrorImpl) Type() AutoscalerErrorType { + return e.errorType +} + +func (e autoscalerErrorImpl) Reason() AutoscalerErrorReason { + return e.errorReason +} + +// AddPrefix adds a prefix to error message. +// Returns the error it's called for convenient inline use. +// Example: +// if err := DoSomething(myObject); err != nil { +// +// return err.AddPrefix("can't do something with %v: ", myObject) +// +// } +func (e autoscalerErrorImpl) AddPrefix(msg string, args ...interface{}) AutoscalerError { + e.msg = fmt.Sprintf(msg, args...) + e.msg + return e +} + +// ServiceRawError wraps the RawError returned by the k8s/cloudprovider +// Azure clients. The error body should satisfy the autorest.ServiceError type +type ServiceRawError struct { + ServiceError *azure.ServiceError `json:"error,omitempty"` +} + +func azureToAutoscalerError(rerr *retry.Error) AutoscalerError { + if rerr == nil { + return nil + } + if rerr.RawError == nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + + re := ServiceRawError{} + err := json.Unmarshal([]byte(rerr.RawError.Error()), &re) + if err != nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + se := re.ServiceError + if se == nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + var errCode CloudProviderErrorReason + if se.Code == "" { + errCode = Unknown + } else if se.Code == OperationNotAllowed { + errCode = getOperationNotAllowedReason(se) + } else { + errCode = CloudProviderErrorReason(se.Code) + } + return NewAutoscalerCloudProviderError(errCode, se.Message) +} + +// getOperationNotAllowedReason renames the error code for quotas to a more human-readable error +func getOperationNotAllowedReason(se *azure.ServiceError) CloudProviderErrorReason { + if strings.Contains(se.Message, "Quota increase") { + return QuotaExceeded + } + return CloudProviderErrorReason(OperationNotAllowed) +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_logger.go b/cluster-autoscaler/cloudprovider/azure/azure_logger.go new file mode 100644 index 000000000000..f074042a123a --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_logger.go @@ -0,0 +1,109 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "runtime" + "time" + + "crypto/rand" + + "github.com/sirupsen/logrus" +) + +// Logger is a type that can log QOSEventInfo +type Logger struct { + QOSLogger *logrus.Entry + EnableQOSLogging bool +} + +// QOSEventInfo is a type used to store qosEvents' info when logging is delayed +type QOSEventInfo struct { + Start time.Time + End time.Time + Properties map[string]interface{} +} + +var ( + logger *Logger +) + +const ( + // SourceFieldName source + SourceFieldName = "source" + // ClusterAutoscalerQosLog source field name for the QosLogger + ClusterAutoscalerQosLog = "ClusterAutoscalerQosLog" + epochFieldName = "env_epoch" + fileNameFieldName = "fileName" + lineNumberFieldName = "lineNumber" + methodNameFieldName = "methodName" + durationInMillisecondsFieldName = "durationInMilliseconds" + resultFieldName = "result" + errorDetailsFieldName = "errorDetails" + errorTypeFieldName = "errorType" + errorReasonFieldName = "errorReason" + startTimeFieldName = "startTime" + endTimeFieldName = "endTime" + upperCaseAlphanumeric = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" +) + +// NewLogger makes a new Logger that can log qos events +func NewLogger(enableQOSLogging bool) { + entryLogger := logrus.New() + entryLogger.Formatter = &logrus.JSONFormatter{} + log := logrus.NewEntry(entryLogger) + epoch, _ := getEpochRandomString() + log.WithField(epochFieldName, epoch) + log = withCallerInfo(log) + logger = &Logger{ + log.WithField(SourceFieldName, ClusterAutoscalerQosLog), + enableQOSLogging, + } +} + +func withCallerInfo(logger *logrus.Entry) *logrus.Entry { + _, file, line, _ := runtime.Caller(3) + fields := make(map[string]interface{}) + fields[fileNameFieldName] = file + fields[lineNumberFieldName] = line + return logger.WithFields(fields) +} + +// getAutoscalerErrorInfo formats the passed in AutoscalerError to be logged in QosEvent +func getAutoscalerErrorInfo(autoscalerErrors AutoscalerError) (map[string]interface{}, string) { + autoscalerErrorsMap := map[string]interface{}{errorDetailsFieldName: nil, errorTypeFieldName: nil, errorReasonFieldName: nil} + result := "Succeeded" + if autoscalerErrors != nil { + autoscalerErrorsMap = map[string]interface{}{errorDetailsFieldName: autoscalerErrors.Error(), errorTypeFieldName: autoscalerErrors.Type(), errorReasonFieldName: autoscalerErrors.Reason()} + result = "Failed" + } + return autoscalerErrorsMap, result +} + +// getEpochRandomString generates a random string with the provided length using the given alphabet +func getEpochRandomString() (string, error) { + randomBytes := make([]byte, 5) + _, err := rand.Read(randomBytes) + if err != nil { + return "", err + } + for index, randomByte := range randomBytes { + foldedOffset := randomByte % byte(len(upperCaseAlphanumeric)) + randomBytes[index] = upperCaseAlphanumeric[foldedOffset] + } + return string(randomBytes), nil +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager.go b/cluster-autoscaler/cloudprovider/azure/azure_manager.go index f2fc79b1ca9c..237df6329fbe 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager.go @@ -33,6 +33,8 @@ import ( ) const ( + azurePrefix = "azure://" + vmTypeVMSS = "vmss" vmTypeStandard = "standard" diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index 674a06cc8a72..c2f4b5721c4c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -682,6 +682,7 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { curSize: 3, sizeRefreshPeriod: manager.azureCache.refreshInterval, instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + scaleDownPolicy: ScaleDownPolicyDelete, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index f0b2e83f07e8..ae6c1b8086c0 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -33,11 +33,13 @@ import ( "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" "github.com/Azure/go-autorest/autorest/azure" + "github.com/Azure/go-autorest/autorest/to" ) var ( defaultVmssInstancesRefreshPeriod = 5 * time.Minute vmssContextTimeout = 3 * time.Minute + asyncContextTimeout = 30 * time.Minute vmssSizeMutex sync.Mutex ) @@ -50,6 +52,17 @@ const ( provisioningStateUpdating string = "Updating" ) +// ScaleDownPolicy is a string representation of the ScaleDownPolicy +type ScaleDownPolicy string + +const ( + // ScaleDownPolicyDelete means that on scale down, nodes will be deleted + ScaleDownPolicyDelete ScaleDownPolicy = "Delete" + // ScaleDownPolicyDeallocate means that on scale down, nodes will be deallocated and on scale-up they will + // be attempted to be started first + ScaleDownPolicyDeallocate ScaleDownPolicy = "Deallocate" +) + // ScaleSet implements NodeGroup interface. type ScaleSet struct { azureRef @@ -72,6 +85,8 @@ type ScaleSet struct { instanceMutex sync.Mutex instanceCache []cloudprovider.Instance lastInstanceRefresh time.Time + + scaleDownPolicy ScaleDownPolicy } // NewScaleSet creates a new NewScaleSet. @@ -95,6 +110,12 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) ( scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod } + // Default to "Delete" VMSS scale down policy + scaleSet.scaleDownPolicy = ScaleDownPolicyDelete + if az.config.ScaleDownPolicy != "" { + scaleSet.scaleDownPolicy = ScaleDownPolicy(az.config.ScaleDownPolicy) + } + return scaleSet, nil } @@ -183,7 +204,27 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { // GetScaleSetSize gets Scale Set size. func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { - return scaleSet.getCurSize() + // First, get the size of the ScaleSet reported by API + // -1 indiciates the ScaleSet hasn't been initialized + size, err := scaleSet.getCurSize() + if size == -1 || err != nil { + klog.V(3).Infof("getScaleSetSize: either size is -1 (actual: %d) or error exists (actual err:%v)", size, err) + return size, err + } + // If the policy for this ScaleSet is Deallocate, the TargetSize is the capacity reported by VMSS minus the nodes + // in deallocated and deallocating states + if scaleSet.scaleDownPolicy == ScaleDownPolicyDeallocate { + totalDeallocationInstances, err := scaleSet.countDeallocatedInstances() + if err != nil { + klog.Errorf("getScaleSetSize: error countDeallocatedInstances for scaleSet %s,"+ + "err: %v", scaleSet.Name, err) + return -1, err + } + size -= int64(totalDeallocationInstances) + klog.V(3).Infof("Found: %d instances in deallocated state, returning target size: %d for scaleSet %s", + totalDeallocationInstances, size, scaleSet.Name) + } + return size, nil } func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { @@ -229,20 +270,17 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(future *azure.Future) { klog.Errorf("virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult - updateVMSSCapacity for scale set %q failed: %v", scaleSet.Name, err) } -// SetScaleSetSize sets ScaleSet size. -func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { +func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachineScaleSet, newSize int64) error { + if vmssInfo == nil { + return fmt.Errorf("vmssInfo cannot be nil while increating scaleSet capacity") + } + scaleSet.sizeMutex.Lock() defer scaleSet.sizeMutex.Unlock() - vmssInfo, err := scaleSet.getVMSSFromCache() - if err != nil { - klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) - return err - } - // Update the new capacity to cache. vmssSizeMutex.Lock() - vmssInfo.Sku.Capacity = &size + vmssInfo.Sku.Capacity = &newSize vmssSizeMutex.Unlock() // Compose a new VMSS for updating. @@ -251,23 +289,215 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { Sku: vmssInfo.Sku, Location: vmssInfo.Location, } + + if vmssInfo.ExtendedLocation != nil { + op.ExtendedLocation = &compute.ExtendedLocation{ + Name: vmssInfo.ExtendedLocation.Name, + Type: vmssInfo.ExtendedLocation.Type, + } + + klog.V(3).Infof("Passing ExtendedLocation information if it is not nil, with Edge Zone name:(%s)", *op.ExtendedLocation.Name) + } + ctx, cancel := getContextWithTimeout(vmssContextTimeout) defer cancel() klog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdateAsync(%s)", scaleSet.Name) - future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name, op) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, + scaleSet.manager.config.ResourceGroup, scaleSet.Name, op) if rerr != nil { - klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, rerr) - return rerr.Error() + klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %+v", scaleSet.Name, rerr) + return azureToAutoscalerError(rerr) } // Proactively set the VMSS size so autoscaler makes better decisions. - scaleSet.curSize = size + scaleSet.curSize = newSize scaleSet.lastSizeRefresh = time.Now() - go scaleSet.updateVMSSCapacity(future) + go scaleSet.waitForCreateOrUpdateInstances(future) + return nil +} + +// waitForCreateOrUpdate waits for the outcome of VMSS capacity update initiated via CreateOrUpdateAsync. +func (scaleSet *ScaleSet) waitForCreateOrUpdateInstances(future *azure.Future) { + var err error + + defer func() { + // Invalidate instanceCache on success and failure. Failure might have created a few instances, but it is very rare. + scaleSet.invalidateInstanceCache() + if err != nil { + klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get "+ + "the real size from API", scaleSet.Name, err) + // Invalidate the VMSS size cache in order to fetch the size from the API. + scaleSet.invalidateLastSizeRefreshWithLock() + scaleSet.manager.invalidateCache() + } + }() + + ctx, cancel := getContextWithTimeout(asyncContextTimeout) + defer cancel() + + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(%s)", scaleSet.Name) + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(ctx, + future, scaleSet.manager.config.ResourceGroup) + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + + if isSuccess { + klog.V(3).Infof("waitForCreateOrUpdateInstances(%s) success", scaleSet.Name) + return + } + + klog.Errorf("waitForCreateOrUpdateInstances(%s) failed, err: %v", scaleSet.Name, err) +} + +// SetScaleSetSize sets ScaleSet size. +func (scaleSet *ScaleSet) SetScaleSetSize(size int64, delta int) error { + vmssInfo, err := scaleSet.getVMSSFromCache() + if err != nil { + klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return err + } + + requiredInstances := delta + // If the policy is deallocate, then attempt to satisfy the request by starting existing instances + if scaleSet.scaleDownPolicy == ScaleDownPolicyDeallocate { + deallocatedInstances, err := scaleSet.getDeallocatedInstances() + if err != nil { + klog.Errorf("setScaleSetSize: error getDeallocatedInstances for scaleSet %s, "+ + "err : %v", scaleSet.Name, err) + } else { + klog.V(3).Infof("Attempting to start: %d instances from deallocated state", requiredInstances) + + // Go through current instances and attempt to reallocate them + for _, instance := range deallocatedInstances { + // we're done + if requiredInstances <= 0 { + break + } + instancesToStart := []*azureRef{{Name: instance.Id}} + err := scaleSet.startInstances(instancesToStart) + if err != nil { + klog.Errorf("Failed to start instances %v in scale set %q: %v", instancesToStart, scaleSet.Name, err) + continue + } + klog.V(3).Infof("Successfully started instances %v in scale set %q", instancesToStart, scaleSet.Name) + requiredInstances-- + } + } + } + + // If after reallocating instances we still need more instances or we're just in Delete mode + // send a scale request + if requiredInstances > 0 { + klog.V(3).Infof("Remaining unsatisfied count is %d. Attempting to increase scale set %q "+ + "capacity", requiredInstances, scaleSet.Name) + err := scaleSet.createOrUpdateInstances(&vmssInfo, size) + if err != nil { + klog.Errorf("Failed to increase capacity for scale set %q to %d: %v", scaleSet.Name, requiredInstances, err) + return err + } + } + return nil +} + +// startInstances starts the given instances. All instances must be controlled by the same nodegroup. +func (scaleSet *ScaleSet) startInstances(instances []*azureRef) error { + if len(instances) == 0 { + return nil + } + + klog.V(3).Infof("Starting vmss instances %v", instances) + + commonNg, err := scaleSet.manager.GetNodeGroupForInstance(instances[0]) + if err != nil { + return err + } + + instancesToStart := []*azureRef{} + for _, instance := range instances { + err = scaleSet.verifyNodeGroup(instance, commonNg.Id()) + if err != nil { + return err + } + + if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil && + cpi.Status.State == cloudprovider.InstanceRunning { + klog.V(3).Infof("Skipping deleting instance %s as its current state is running", instance.Name) + continue + } + instancesToStart = append(instancesToStart, instance) + } + + // nothing to delete + if len(instancesToStart) == 0 { + klog.V(3).Infof("No new instances eligible for starting, skipping") + return nil + } + + instanceIDs := []string{} + for _, instance := range instancesToStart { + instanceID, err := getLastSegment(instance.Name) + if err != nil { + klog.Errorf("getLastSegment failed with error: %v", err) + return err + } + instanceIDs = append(instanceIDs, instanceID) + } + + requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ + InstanceIds: &instanceIDs, + } + + ctx, cancel := getContextWithTimeout(vmssContextTimeout) + defer cancel() + resourceGroup := scaleSet.manager.config.ResourceGroup + + scaleSet.instanceMutex.Lock() + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.StartInstancesAsync(%v) for %s", + requiredIds.InstanceIds, scaleSet.Name) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.StartInstancesAsync(ctx, resourceGroup, + commonNg.Id(), *requiredIds) + scaleSet.instanceMutex.Unlock() + if rerr != nil { + klog.Errorf("virtualMachineScaleSetsClient.StartInstancesAsync for instances %v for %s failed: "+ + "%+v", requiredIds.InstanceIds, scaleSet.Name, rerr) + return rerr.Error() + } + + // Proactively set the status of the instances to be running in cache + for _, instance := range instancesToStart { + scaleSet.setInstanceStatusByProviderID(instance.Name, cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}) + } + + go scaleSet.waitForStartInstances(future, requiredIds) return nil } +func (scaleSet *ScaleSet) waitForStartInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { + ctx, cancel := getContextWithCancel() + defer cancel() + + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForStartInstancesResult(%v) for %s", requiredIds.InstanceIds, scaleSet.Name) + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForStartInstancesResult(ctx, future, + scaleSet.manager.config.ResourceGroup) + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + + if isSuccess { + klog.V(3).Infof("WaitForStartInstancesResult(%v) for %s success", requiredIds.InstanceIds, scaleSet.Name) + // No need to invalidateInstanceCache because the states were proactively set to Running. + return + } + + scaleSet.invalidateInstanceCache() + klog.Errorf("WaitForStartInstancesResult(%v) for %s failed with error: %v", + requiredIds.InstanceIds, scaleSet.Name, err) +} + // TargetSize returns the current TARGET size of the node group. It is possible that the // number is different from the number of nodes registered in Kubernetes. func (scaleSet *ScaleSet) TargetSize() (int, error) { @@ -294,7 +524,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, scaleSet.MaxSize()) } - return scaleSet.SetScaleSetSize(size + int64(delta)) + return scaleSet.SetScaleSetSize(size+int64(delta), delta) } // GetScaleSetVms returns list of nodes for the given scale set. @@ -400,7 +630,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg) } - if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting { + if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting { klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name) continue } @@ -462,7 +692,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered // DeleteNodes deletes the nodes from the group. func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { - klog.V(8).Infof("Delete nodes requested: %q\n", nodes) + klog.V(4).Infof("Delete nodes using scaleDownPolicy=%s requested: %q\n", scaleSet.scaleDownPolicy, nodes) size, err := scaleSet.GetScaleSetSize() if err != nil { return err @@ -493,6 +723,9 @@ func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { refs = append(refs, ref) } + if scaleSet.scaleDownPolicy == ScaleDownPolicyDeallocate { + return scaleSet.deallocateInstances(refs) + } return scaleSet.DeleteInstances(refs, hasUnregisteredNodes) } @@ -578,18 +811,26 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) { } func (scaleSet *ScaleSet) buildScaleSetCache(lastRefresh time.Time) error { + klog.V(3).Infof("buildScaleSetCache: resetting instance Cache for scaleSet %s", + scaleSet.Name) vms, rerr := scaleSet.GetScaleSetVms() if rerr != nil { if isAzureRequestsThrottled(rerr) { - // Log a warning and update the instance refresh time so that it would retry after cache expiration - klog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + // Log a warning and update the instance refresh time so that it would retry later. + // Ensure to retry no sooner than rerr.RetryAfter + klog.Warningf("buildScaleSetCache: GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + nextRefresh := lastRefresh.Add(scaleSet.instancesRefreshPeriod) + if nextRefresh.Before(rerr.RetryAfter) { + delay := rerr.RetryAfter.Sub(nextRefresh) + lastRefresh = lastRefresh.Add(delay) + } scaleSet.lastInstanceRefresh = lastRefresh return nil } return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + scaleSet.instanceCache = scaleSet.buildInstanceCache(vms) scaleSet.lastInstanceRefresh = lastRefresh return nil @@ -607,7 +848,7 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + scaleSet.instanceCache = scaleSet.buildInstanceCache(vms) scaleSet.lastInstanceRefresh = lastRefresh return nil @@ -615,7 +856,7 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error // Note that the GetScaleSetVms() results is not used directly because for the List endpoint, // their resource ID format is not consistent with Get endpoint -func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { +func (scaleSet *ScaleSet) buildInstanceCache(vmList interface{}) []cloudprovider.Instance { instances := []cloudprovider.Instance{} switch vms := vmList.(type) { @@ -625,7 +866,7 @@ func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) + scaleSet.addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) } case []compute.VirtualMachine: for _, vm := range vms { @@ -633,14 +874,14 @@ func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) + scaleSet.addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) } } return instances } -func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) { +func (scaleSet *ScaleSet) addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) { // The resource ID is empty string, which indicates the instance may be in deleting state. if len(*id) == 0 { return @@ -655,19 +896,27 @@ func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisi *instances = append(*instances, cloudprovider.Instance{ Id: "azure://" + resourceID, - Status: instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState), + Status: scaleSet.instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState), }) } -func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) { +func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool, error) { scaleSet.instanceMutex.Lock() defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("getInstanceByProviderID: error validating instanceCache for providerID %s for scaleSet %s, "+ + "err: %v", providerID, scaleSet.Name, err) + return cloudprovider.Instance{}, false, err + } + for _, instance := range scaleSet.instanceCache { if instance.Id == providerID { - return instance, true + return instance, true, nil } } - return cloudprovider.Instance{}, false + return cloudprovider.Instance{}, false, nil } func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, status cloudprovider.InstanceStatus) { @@ -683,8 +932,17 @@ func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, statu } // instanceStatusFromProvisioningStateAndPowerState converts the VM provisioning state and power state to cloudprovider.InstanceStatus -func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { - if provisioningState == nil { +func (scaleSet *ScaleSet) instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { + // Prefer the proactive cache view of the instance state if we aren't in a terminal state + // This is because the power state may be taking longer to update and we don't want + // an unfortunate VM update (TTL 5 min) to reset that state to running. + if provisioningState == nil || *provisioningState == string(compute.GalleryProvisioningStateUpdating) { + providerID := azurePrefix + resourceId + for _, instance := range scaleSet.instanceCache { + if instance.Id == providerID { + return instance.Status + } + } return nil } @@ -717,6 +975,26 @@ func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisi status.State = cloudprovider.InstanceRunning } + // TODO + // now check power state + /*if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { + statuses := *vm.InstanceView.Statuses + + for _, s := range statuses { + state := to.String(s.Code) + // set the state to deallocated/deallocating based on their running state if provisioning is succeeded. + // This is to avoid the weird states with Failed VMs which can fail all API calls. + // This information is used to build instanceCache in CA. + if *vm.ProvisioningState == string(compute.GalleryProvisioningStateSucceeded) { + if powerStateDeallocated(state) { + status.State = cloudprovider.InstanceDeallocated + } else if powerStateDeallocating(state) { + status.State = cloudprovider.InstanceDeallocating + } + } + } + }*/ + return status } @@ -741,3 +1019,207 @@ func (scaleSet *ScaleSet) getOrchestrationMode() (compute.OrchestrationMode, err } return vmss.OrchestrationMode, nil } + +// deallocateInstances deallocates the given instances. All instances must be controlled by the same nodegroup. +func (scaleSet *ScaleSet) deallocateInstances(instances []*azureRef) error { + if len(instances) == 0 { + return nil + } + + klog.V(3).Infof("Deallocating vmss instances %v", instances) + + commonNg, err := scaleSet.manager.GetNodeGroupForInstance(instances[0]) + if err != nil { + return err + } + + instancesToDeallocate := []*azureRef{} + for _, instance := range instances { + err = scaleSet.verifyNodeGroup(instance, commonNg.Id()) + if err != nil { + return err + } + + // Instances in the "InstanceDeallocated" state are the only ones currently being ignored. However, if the + // deallocation process fails for instances in the "InstanceDeallocating" state, we are currently invalidating + // the cache by calling "waitForDeallocateInstances()" without implementing proper error handling for these cases. + // Consequently, we do not intend to skip these instances. This approach is simply a conservative measure to + // ensure that all instances are accounted. + if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil && + cpi.Status.ErrorInfo != nil && cpi.Status.ErrorInfo.ErrorCode == "InstanceDeallocated" { + klog.V(3).Infof("Skipping deleting instance %s as its current state is deallocated", instance.Name) + continue + } + instancesToDeallocate = append(instancesToDeallocate, instance) + } + + // nothing to delete + if len(instancesToDeallocate) == 0 { + klog.V(3).Infof("No new instances eligible for deallocation, skipping") + return nil + } + + instanceIDs := []string{} + for _, instance := range instancesToDeallocate { + instanceID, err := getLastSegment(instance.Name) + if err != nil { + klog.Errorf("getLastSegment failed with error: %v", err) + return err + } + instanceIDs = append(instanceIDs, instanceID) + } + + requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ + InstanceIds: &instanceIDs, + } + + ctx, cancel := getContextWithTimeout(vmssContextTimeout) + defer cancel() + resourceGroup := scaleSet.manager.config.ResourceGroup + + scaleSet.instanceMutex.Lock() + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.DeallocateInstancesAsync(%v) "+ + "for %s", requiredIds.InstanceIds, scaleSet.Name) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeallocateInstancesAsync(ctx, resourceGroup, + commonNg.Id(), *requiredIds) + scaleSet.instanceMutex.Unlock() + if rerr != nil { + klog.Errorf("virtualMachineScaleSetsClient.DeallocateInstancesAsync for instances %v for %s "+ + "failed: %+v", requiredIds.InstanceIds, scaleSet.Name, rerr) + return azureToAutoscalerError(rerr) + } + + // Proactively set the status of the instances to be running in cache as deallocating. Status will change to deallocated on success + for _, instance := range instancesToDeallocate { + state := cloudprovider.InstanceStatus{ + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorCode: "InstanceDeallocating", + }, + } + scaleSet.setInstanceStatusByProviderID(instance.Name, state) + } + + go scaleSet.waitForDeallocateInstances(future, instancesToDeallocate, requiredIds) + return nil +} + +func (scaleSet *ScaleSet) verifyNodeGroup(instance *azureRef, commonNgID string) error { + ng, err := scaleSet.manager.GetNodeGroupForInstance(instance) + if err != nil { + return err + } + + if !strings.EqualFold(ng.Id(), commonNgID) { + return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", + instance.Name, commonNgID) + } + return nil +} + +func (scaleSet *ScaleSet) waitForDeallocateInstances(future *azure.Future, instancesToDeallocate []*azureRef, + requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { + ctx, cancel := getContextWithTimeout(asyncContextTimeout) + defer cancel() + + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeallocateInstancesResult(%v) for %s", + requiredIds.InstanceIds, scaleSet.Name) + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeallocateInstancesResult(ctx, + future, scaleSet.manager.config.ResourceGroup) + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + + if isSuccess { + klog.V(3).Infof("WaitForDeallocateInstancesResult(%v) for %s success", + requiredIds.InstanceIds, scaleSet.Name) + // Set the status of the instances to deallocated only if WaitForDeallocate Call Succeeds + for _, instance := range instancesToDeallocate { + state := cloudprovider.InstanceStatus{ + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorCode: "InstanceDeallocated", + }, + } + scaleSet.setInstanceStatusByProviderID(instance.Name, state) + } + return + } + + scaleSet.invalidateInstanceCache() + klog.Errorf("WaitForDeallocateInstancesResult(%v) for %s failed with error: %v", requiredIds.InstanceIds, scaleSet.Name, err) +} + +// validateInstanceCacheWithoutLock is used a helper function for validateInstanceCache, get and set methods. +func (scaleSet *ScaleSet) validateInstanceCacheWithoutLock() error { + if scaleSet.lastInstanceRefresh.Add(scaleSet.instancesRefreshPeriod).After(time.Now()) { + klog.V(3).Infof("validateInstanceCacheWithoutLock: no need to reset instance Cache for scaleSet %s", + scaleSet.Name) + return nil + } + + splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1) + lastRefresh := time.Now().Add(-time.Second * time.Duration(splay)) + return scaleSet.buildScaleSetCache(lastRefresh) +} + +// getDeallocatedInstances returns list of deallocated instances. +func (scaleSet *ScaleSet) getDeallocatedInstances() ([]cloudprovider.Instance, error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("getDeallocatedInstances: error validating instanceCache for deallocated instances for scaleSet %s, "+ + "err: %v", scaleSet.Name, err) + return []cloudprovider.Instance{}, err + } + + instances := []cloudprovider.Instance{} + for _, instance := range scaleSet.instanceCache { + if instance.Status != nil && instance.Status.ErrorInfo != nil && instance.Status.ErrorInfo.ErrorCode == "InstanceDeallocated" { + instances = append(instances, instance) + } + } + return instances, nil +} + +func (scaleSet *ScaleSet) countDeallocatedInstances() (int, error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("countDeallocatedInstances: error validating instanceCache for scaleSet %s, "+ + "err: %v", scaleSet.Name, err) + return 0, err + } + + var numInstances int + for _, instance := range scaleSet.instanceCache { + if instance.Status != nil && instance.Status.ErrorInfo != nil && instance.Status.ErrorInfo.ErrorCode == "InstanceDeallocated" { + numInstances++ + } + } + return numInstances, nil +} + +func (scaleSet *ScaleSet) getSKU() string { + vmssInfo, err := scaleSet.getVMSSFromCache() + if err != nil { + klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return "" + } + return to.String(vmssInfo.Sku.Name) +} + +func getCrpOperation(future *azure.Future) string { + if future != nil { + resp := future.Response() + defer resp.Body.Close() + header := resp.Header["X-Ms-Request-Id"] + if len(header) > 0 { + return header[0] + } + } + return "" +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index cd7c7d56b772..70ae538d224e 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -490,11 +490,13 @@ func TestDeleteNodes(t *testing.T) { assert.Equal(t, 1, targetSize) // Ensure that the status for the instances is Deleting - instance0, found := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + instance0, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + assert.NoError(t, err) assert.True(t, found, true) assert.Equal(t, instance0.Status.State, cloudprovider.InstanceDeleting) - instance2, found := scaleSet.getInstanceByProviderID(nodesToDelete[1].Spec.ProviderID) + instance2, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[1].Spec.ProviderID) + assert.NoError(t, err) assert.True(t, found, true) assert.Equal(t, instance2.Status.State, cloudprovider.InstanceDeleting) @@ -575,7 +577,8 @@ func TestDeleteNodeUnregistered(t *testing.T) { assert.Equal(t, 2, targetSize) // Ensure that the status for the instances is Deleting - instance0, found := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + instance0, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + assert.NoError(t, err) assert.True(t, found, true) assert.Equal(t, instance0.Status.State, cloudprovider.InstanceDeleting) } diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml index c9c40f6d6df7..4abbc623592d 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml @@ -200,4 +200,9 @@ spec: secretKeyRef: key: VMType name: cluster-autoscaler-azure + - name: AZURE_SCALE_DOWN_POLICY + valueFrom: + secretKeyRef: + key: ScaleDownPolicy + name: cluster-autoscaler-azure restartPolicy: Always