From 2c62576cb440b0e19b8a18d04d9b5b66914f9b7f Mon Sep 17 00:00:00 2001 From: Jack Francis Date: Tue, 17 Oct 2023 13:00:20 -0700 Subject: [PATCH] azure: enable vmss deallocate Signed-off-by: Jack Francis --- charts/cluster-autoscaler/Chart.yaml | 2 +- charts/cluster-autoscaler/README.md | 1 + .../cluster-autoscaler/templates/secret.yaml | 1 + charts/cluster-autoscaler/values.yaml | 4 + .../cloudprovider/azure/azure_config.go | 3 + .../cloudprovider/azure/azure_error.go | 204 +++++++++++++++ .../cloudprovider/azure/azure_logger.go | 156 ++++++++++++ .../cloudprovider/azure/azure_manager.go | 2 + .../cloudprovider/azure/azure_manager_test.go | 1 + .../cloudprovider/azure/azure_scale_set.go | 241 +++++++++++++++++- 10 files changed, 602 insertions(+), 13 deletions(-) create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_error.go create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_logger.go diff --git a/charts/cluster-autoscaler/Chart.yaml b/charts/cluster-autoscaler/Chart.yaml index 394a0be6d268..f6dd2342319b 100644 --- a/charts/cluster-autoscaler/Chart.yaml +++ b/charts/cluster-autoscaler/Chart.yaml @@ -11,4 +11,4 @@ name: cluster-autoscaler sources: - https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler type: application -version: 9.29.3 +version: 9.30.0 diff --git a/charts/cluster-autoscaler/README.md b/charts/cluster-autoscaler/README.md index dd6171b780e7..034acb6860ab 100644 --- a/charts/cluster-autoscaler/README.md +++ b/charts/cluster-autoscaler/README.md @@ -356,6 +356,7 @@ vpa: | azureClusterName | string | `""` | Azure AKS cluster name. Required if `cloudProvider=azure` | | azureNodeResourceGroup | string | `""` | Azure resource group where the cluster's nodes are located, typically set as `MC___`. Required if `cloudProvider=azure` | | azureResourceGroup | string | `""` | Azure resource group that the cluster is located. Required if `cloudProvider=azure` | +| azureScaleDownPolicy | string | `"Delete"` | Azure ScaleDownPolicy, either "Delete" (default) or "Deallocate" Only relevant if `cloudProvider=azure` | | azureSubscriptionID | string | `""` | Azure subscription where the resources are located. Required if `cloudProvider=azure` | | azureTenantID | string | `""` | Azure tenant where the resources are located. Required if `cloudProvider=azure` | | azureUseManagedIdentityExtension | bool | `false` | Whether to use Azure's managed identity extension for credentials. If using MSI, ensure subscription ID, resource group, and azure AKS cluster name are set. You can only use one authentication method at a time, either azureUseWorkloadIdentityExtension or azureUseManagedIdentityExtension should be set. | diff --git a/charts/cluster-autoscaler/templates/secret.yaml b/charts/cluster-autoscaler/templates/secret.yaml index 9c58d0feb1a6..f60ccd1d0585 100644 --- a/charts/cluster-autoscaler/templates/secret.yaml +++ b/charts/cluster-autoscaler/templates/secret.yaml @@ -14,6 +14,7 @@ data: VMType: "{{ .Values.azureVMType | b64enc }}" ClusterName: "{{ .Values.azureClusterName | b64enc }}" NodeResourceGroup: "{{ .Values.azureNodeResourceGroup | b64enc }}" + ScaleDownPolicy: "{{ .Values.azureScaleDownPolicy | b64enc }}" {{- else if eq .Values.cloudProvider "aws" }} AwsAccessKeyId: "{{ .Values.awsAccessKeyID | b64enc }}" AwsSecretAccessKey: "{{ .Values.awsSecretAccessKey | b64enc }}" diff --git a/charts/cluster-autoscaler/values.yaml b/charts/cluster-autoscaler/values.yaml index b7b39739d3ec..6e262c085ec3 100644 --- a/charts/cluster-autoscaler/values.yaml +++ b/charts/cluster-autoscaler/values.yaml @@ -104,6 +104,10 @@ azureUseWorkloadIdentityExtension: false # azureVMType -- Azure VM type. azureVMType: "AKS" +# azureScaleDownPolicy -- Azure ScaleDownPolicy, either "Delete" (default) or "Deallocate" +# Only relevant if `cloudProvider=azure` +azureScaleDownPolicy: "Delete" + # cloudConfigPath -- Configuration file for cloud provider. cloudConfigPath: "" diff --git a/cluster-autoscaler/cloudprovider/azure/azure_config.go b/cluster-autoscaler/cloudprovider/azure/azure_config.go index 810884dfc691..051c574c692e 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_config.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_config.go @@ -140,6 +140,9 @@ type Config struct { // EnableVmssFlex defines whether to enable Vmss Flex support or not EnableVmssFlex bool `json:"enableVmssFlex,omitempty" yaml:"enableVmssFlex,omitempty"` + + // ScaleDownPolicy is the VMSS scale down policy, either "Delete" or "Deallocate" + ScaleDownPolicy string `json:"scaleDownPolicy" yaml:"scaleDownPolicy"` } // BuildAzureConfig returns a Config object for the Azure clients diff --git a/cluster-autoscaler/cloudprovider/azure/azure_error.go b/cluster-autoscaler/cloudprovider/azure/azure_error.go new file mode 100644 index 000000000000..7499b67444f0 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_error.go @@ -0,0 +1,204 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/Azure/go-autorest/autorest/azure" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" +) + +// Unknown is for errors that have nil RawError body +const Unknown CloudProviderErrorReason = "Unknown" + +// Errors on the sync path +const ( + // QuotaExceeded falls under OperationNotAllowed error code but we make it more specific here + QuotaExceeded CloudProviderErrorReason = "QuotaExceeded" + // OperationNotAllowed is an umbrella for a lot of errors returned by Azure + OperationNotAllowed string = "OperationNotAllowed" +) + +// AutoscalerErrorType describes a high-level category of a given error +type AutoscalerErrorType string + +// AutoscalerErrorReason is a more detailed reason for the failed operation +type AutoscalerErrorReason string + +// CloudProviderErrorReason providers more details on errors of type CloudProviderError +type CloudProviderErrorReason AutoscalerErrorReason + +// AutoscalerError contains information about Autoscaler errors +type AutoscalerError interface { + // Error implements golang error interface + Error() string + + // Type returns the type of AutoscalerError + Type() AutoscalerErrorType + + // Reason returns the reason of the AutoscalerError + Reason() AutoscalerErrorReason + + // AddPrefix adds a prefix to error message. + // Returns the error it's called for convenient inline use. + // Example: + // if err := DoSomething(myObject); err != nil { + // return err.AddPrefix("can't do something with %v: ", myObject) + // } + AddPrefix(msg string, args ...interface{}) AutoscalerError +} + +type autoscalerErrorImpl struct { + errorType AutoscalerErrorType + errorReason AutoscalerErrorReason + msg string +} + +const ( + // CloudProviderError is an error related to underlying infrastructure + CloudProviderError AutoscalerErrorType = "CloudProviderError" + // ApiCallError is an error related to communication with k8s API server + ApiCallError AutoscalerErrorType = "ApiCallError" + // Timeout is an error related to nodes not joining the cluster in maxNodeProvisionTime + Timeout AutoscalerErrorType = "Timeout" + // InternalError is an error inside Cluster Autoscaler + InternalError AutoscalerErrorType = "InternalError" + // TransientError is an error that causes us to skip a single loop, but + // does not require any additional action. + TransientError AutoscalerErrorType = "TransientError" + // ConfigurationError is an error related to bad configuration provided + // by a user. + ConfigurationError AutoscalerErrorType = "ConfigurationError" + // NodeGroupDoesNotExistError signifies that a NodeGroup + // does not exist. + NodeGroupDoesNotExistError AutoscalerErrorType = "nodeGroupDoesNotExistError" +) + +const ( + // NodeRegistration signifies an error with node registering + NodeRegistration AutoscalerErrorReason = "NodeRegistration" +) + +// NewAutoscalerError returns new autoscaler error with a message constructed from format string +func NewAutoscalerError(errorType AutoscalerErrorType, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: errorType, + msg: fmt.Sprintf(msg, args...), + } +} + +// NewAutoscalerErrorWithReason returns new autoscaler error with a reason and a message constructed from format string +func NewAutoscalerErrorWithReason(errorType AutoscalerErrorType, reason AutoscalerErrorReason, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: errorType, + errorReason: reason, + msg: fmt.Sprintf(msg, args...), + } +} + +// NewAutoscalerCloudProviderError returns new autoscaler error with a cloudprovider error type and a message constructed from format string +func NewAutoscalerCloudProviderError(errorReason CloudProviderErrorReason, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: CloudProviderError, + errorReason: AutoscalerErrorReason(errorReason), + msg: fmt.Sprintf(msg, args...), + } +} + +// ToAutoscalerError converts an error to AutoscalerError with given type, +// unless it already is an AutoscalerError (in which case it's not modified). +func ToAutoscalerError(defaultType AutoscalerErrorType, err error) AutoscalerError { + if err == nil { + return nil + } + if e, ok := err.(AutoscalerError); ok { + return e + } + return NewAutoscalerError(defaultType, "%v", err) +} + +// Error implements golang error interface +func (e autoscalerErrorImpl) Error() string { + return e.msg +} + +// Type returns the type of AutoscalerError +func (e autoscalerErrorImpl) Type() AutoscalerErrorType { + return e.errorType +} + +func (e autoscalerErrorImpl) Reason() AutoscalerErrorReason { + return e.errorReason +} + +// AddPrefix adds a prefix to error message. +// Returns the error it's called for convenient inline use. +// Example: +// if err := DoSomething(myObject); err != nil { +// +// return err.AddPrefix("can't do something with %v: ", myObject) +// +// } +func (e autoscalerErrorImpl) AddPrefix(msg string, args ...interface{}) AutoscalerError { + e.msg = fmt.Sprintf(msg, args...) + e.msg + return e +} + +// ServiceRawError wraps the RawError returned by the k8s/cloudprovider +// Azure clients. The error body should satisfy the autorest.ServiceError type +type ServiceRawError struct { + ServiceError *azure.ServiceError `json:"error,omitempty"` +} + +func azureToAutoscalerError(rerr *retry.Error) AutoscalerError { + if rerr == nil { + return nil + } + if rerr.RawError == nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + + re := ServiceRawError{} + err := json.Unmarshal([]byte(rerr.RawError.Error()), &re) + if err != nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + se := re.ServiceError + if se == nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + var errCode CloudProviderErrorReason + if se.Code == "" { + errCode = Unknown + } else if se.Code == OperationNotAllowed { + errCode = getOperationNotAllowedReason(se) + } else { + errCode = CloudProviderErrorReason(se.Code) + } + return NewAutoscalerCloudProviderError(errCode, se.Message) +} + +// getOperationNotAllowedReason renames the error code for quotas to a more human-readable error +func getOperationNotAllowedReason(se *azure.ServiceError) CloudProviderErrorReason { + if strings.Contains(se.Message, "Quota increase") { + return QuotaExceeded + } + return CloudProviderErrorReason(OperationNotAllowed) +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_logger.go b/cluster-autoscaler/cloudprovider/azure/azure_logger.go new file mode 100644 index 000000000000..a27e7b910a87 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_logger.go @@ -0,0 +1,156 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "math" + "runtime" + "time" + + "crypto/rand" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + + "github.com/sirupsen/logrus" +) + +// Logger is a type that can log QOSEventInfo +type Logger struct { + QOSLogger *logrus.Entry + EnableQOSLogging bool +} + +// QOSEventInfo is a type used to store qosEvents' info when logging is delayed +type QOSEventInfo struct { + Start time.Time + End time.Time + Properties map[string]interface{} +} + +var ( + logger *Logger +) + +const ( + // SourceFieldName source + SourceFieldName = "source" + // ClusterAutoscalerQosLog source field name for the QosLogger + ClusterAutoscalerQosLog = "ClusterAutoscalerQosLog" + epochFieldName = "env_epoch" + fileNameFieldName = "fileName" + lineNumberFieldName = "lineNumber" + methodNameFieldName = "methodName" + durationInMillisecondsFieldName = "durationInMilliseconds" + resultFieldName = "result" + errorDetailsFieldName = "errorDetails" + errorTypeFieldName = "errorType" + errorReasonFieldName = "errorReason" + startTimeFieldName = "startTime" + endTimeFieldName = "endTime" + upperCaseAlphanumeric = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" +) + +// NewLogger makes a new Logger that can log qos events +func NewLogger(enableQOSLogging bool) { + entryLogger := logrus.New() + entryLogger.Formatter = &logrus.JSONFormatter{} + log := logrus.NewEntry(entryLogger) + epoch, _ := getEpochRandomString() + log.WithField(epochFieldName, epoch) + log = withCallerInfo(log) + logger = &Logger{ + log.WithField(SourceFieldName, ClusterAutoscalerQosLog), + enableQOSLogging, + } +} + +func withCallerInfo(logger *logrus.Entry) *logrus.Entry { + _, file, line, _ := runtime.Caller(3) + fields := make(map[string]interface{}) + fields[fileNameFieldName] = file + fields[lineNumberFieldName] = line + return logger.WithFields(fields) +} + +// QOSEvents creates QOS events for computeExpansionQOSEvents and podEquivalenceGroupsQOSEvent +func QOSEvents(computeExpansionQOSEvents []QOSEventInfo, podEquivalenceGroupsQOSEvent QOSEventInfo) { + QOSEvent(podEquivalenceGroupsQOSEvent.Start, podEquivalenceGroupsQOSEvent.End, nil, "buildPodEquivalenceGroups", podEquivalenceGroupsQOSEvent.Properties) + for _, computeExpansionEvent := range computeExpansionQOSEvents { + QOSEvent(computeExpansionEvent.Start, computeExpansionEvent.End, nil, "computeExpansionOption", computeExpansionEvent.Properties) + } +} + +// QOSEvent creates a QOSLogger log entry with fields from the parameter values +func QOSEvent(start time.Time, end time.Time, autoscalerErrors AutoscalerError, methodName string, properties map[string]interface{}) { + if !logger.EnableQOSLogging { + return + } + autoscalerErrorsMap, result := getAutoscalerErrorInfo(autoscalerErrors) + duration := end.Sub(start) + QOSEventInfoFields := map[string]interface{}{ + methodNameFieldName: methodName, + durationInMillisecondsFieldName: math.Floor((float64(duration)/float64(time.Millisecond))*1000) / 1000, + startTimeFieldName: start, + endTimeFieldName: end, + resultFieldName: result, + errorDetailsFieldName: autoscalerErrorsMap[errorDetailsFieldName], + errorTypeFieldName: autoscalerErrorsMap[errorTypeFieldName], + errorReasonFieldName: autoscalerErrorsMap[errorReasonFieldName], + propertiesFieldName: properties, + } + logger.QOSLogger.WithFields(QOSEventInfoFields).Info("Cluster Autoscaler QOS Event") +} + +// QOSCSEEvent creates a QOS logger entry for cse errors +func QOSCSEEvent(methodName string, properties map[string]interface{}, cseError *cloudprovider.InstanceErrorInfo) { + if !logger.EnableQOSLogging || cseError == nil { + return + } + qosEventInfoFields := map[string]interface{}{ + methodNameFieldName: methodName, + propertiesFieldName: properties, + errorDetailsFieldName: cseError.ErrorCode, + errorTypeFieldName: cseError.ErrorClass, + errorReasonFieldName: cseError.ErrorMessage, + } + logger.QOSLogger.WithFields(qosEventInfoFields).Info("Cluster Autoscaler QOS Event") +} + +// getAutoscalerErrorInfo formats the passed in AutoscalerError to be logged in QosEvent +func getAutoscalerErrorInfo(autoscalerErrors AutoscalerError) (map[string]interface{}, string) { + autoscalerErrorsMap := map[string]interface{}{errorDetailsFieldName: nil, errorTypeFieldName: nil, errorReasonFieldName: nil} + result := "Succeeded" + if autoscalerErrors != nil { + autoscalerErrorsMap = map[string]interface{}{errorDetailsFieldName: autoscalerErrors.Error(), errorTypeFieldName: autoscalerErrors.Type(), errorReasonFieldName: autoscalerErrors.Reason()} + result = "Failed" + } + return autoscalerErrorsMap, result +} + +// getEpochRandomString generates a random string with the provided length using the given alphabet +func getEpochRandomString() (string, error) { + randomBytes := make([]byte, 5) + _, err := rand.Read(randomBytes) + if err != nil { + return "", err + } + for index, randomByte := range randomBytes { + foldedOffset := randomByte % byte(len(upperCaseAlphanumeric)) + randomBytes[index] = upperCaseAlphanumeric[foldedOffset] + } + return string(randomBytes), nil +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager.go b/cluster-autoscaler/cloudprovider/azure/azure_manager.go index dc04e1558281..fb66ad51d270 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager.go @@ -33,6 +33,8 @@ import ( ) const ( + azurePrefix = "azure://" + vmTypeVMSS = "vmss" vmTypeStandard = "standard" vmTypeAKS = "aks" diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index 5840e834f7cc..d3b57762a9d8 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -684,6 +684,7 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { curSize: 3, sizeRefreshPeriod: manager.azureCache.refreshInterval, instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + scaleDownPolicy: ScaleDownPolicyDelete, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index f0b2e83f07e8..086bedf9811e 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -33,11 +33,13 @@ import ( "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" "github.com/Azure/go-autorest/autorest/azure" + "github.com/Azure/go-autorest/autorest/to" ) var ( defaultVmssInstancesRefreshPeriod = 5 * time.Minute vmssContextTimeout = 3 * time.Minute + asyncContextTimeout = 30 * time.Minute vmssSizeMutex sync.Mutex ) @@ -50,6 +52,17 @@ const ( provisioningStateUpdating string = "Updating" ) +// ScaleDownPolicy is a string representation of the ScaleDownPolicy +type ScaleDownPolicy string + +const ( + // ScaleDownPolicyDelete means that on scale down, nodes will be deleted + ScaleDownPolicyDelete ScaleDownPolicy = "Delete" + // ScaleDownPolicyDeallocate means that on scale down, nodes will be deallocated and on scale-up they will + // be attempted to be started first + ScaleDownPolicyDeallocate ScaleDownPolicy = "Deallocate" +) + // ScaleSet implements NodeGroup interface. type ScaleSet struct { azureRef @@ -72,6 +85,8 @@ type ScaleSet struct { instanceMutex sync.Mutex instanceCache []cloudprovider.Instance lastInstanceRefresh time.Time + + scaleDownPolicy ScaleDownPolicy } // NewScaleSet creates a new NewScaleSet. @@ -95,6 +110,12 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) ( scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod } + // Default to "Delete" VMSS scale down policy + scaleSet.scaleDownPolicy = ScaleDownPolicyDelete + if az.config.ScaleDownPolicy != "" { + scaleSet.scaleDownPolicy = ScaleDownPolicy(az.config.ScaleDownPolicy) + } + return scaleSet, nil } @@ -462,7 +483,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered // DeleteNodes deletes the nodes from the group. func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { - klog.V(8).Infof("Delete nodes requested: %q\n", nodes) + klog.V(4).Infof("Delete nodes using scaleDownPolicy=%s requested: %q\n", scaleSet.scaleDownPolicy, nodes) size, err := scaleSet.GetScaleSetSize() if err != nil { return err @@ -493,6 +514,9 @@ func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { refs = append(refs, ref) } + if scaleSet.scaleDownPolicy == ScaleDownPolicyDeallocate { + return scaleSet.deallocateInstances(refs) + } return scaleSet.DeleteInstances(refs, hasUnregisteredNodes) } @@ -578,18 +602,26 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) { } func (scaleSet *ScaleSet) buildScaleSetCache(lastRefresh time.Time) error { + klog.V(3).Infof("updateInstanceCache: resetting instance Cache for scaleSet %s", + scaleSet.Name) vms, rerr := scaleSet.GetScaleSetVms() if rerr != nil { if isAzureRequestsThrottled(rerr) { - // Log a warning and update the instance refresh time so that it would retry after cache expiration - klog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + // Log a warning and update the instance refresh time so that it would retry later. + // Ensure to retry no sooner than rerr.RetryAfter + klog.Warningf("buildScaleSetCache: GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + nextRefresh := lastRefresh.Add(scaleSet.instancesRefreshPeriod) + if nextRefresh.Before(rerr.RetryAfter) { + delay := rerr.RetryAfter.Sub(nextRefresh) + lastRefresh = lastRefresh.Add(delay) + } scaleSet.lastInstanceRefresh = lastRefresh return nil } return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + scaleSet.instanceCache = scaleSet.buildInstanceCache(vms) scaleSet.lastInstanceRefresh = lastRefresh return nil @@ -607,7 +639,7 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + scaleSet.instanceCache = scaleSet.buildInstanceCache(vms) scaleSet.lastInstanceRefresh = lastRefresh return nil @@ -615,7 +647,7 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error // Note that the GetScaleSetVms() results is not used directly because for the List endpoint, // their resource ID format is not consistent with Get endpoint -func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { +func (scaleSet *ScaleSet) buildInstanceCache(vmList interface{}) []cloudprovider.Instance { instances := []cloudprovider.Instance{} switch vms := vmList.(type) { @@ -625,7 +657,7 @@ func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) + scaleSet.addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) } case []compute.VirtualMachine: for _, vm := range vms { @@ -633,14 +665,14 @@ func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) + scaleSet.addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) } } return instances } -func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) { +func (scaleSet *ScaleSet) addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) { // The resource ID is empty string, which indicates the instance may be in deleting state. if len(*id) == 0 { return @@ -655,7 +687,7 @@ func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisi *instances = append(*instances, cloudprovider.Instance{ Id: "azure://" + resourceID, - Status: instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState), + Status: scaleSet.instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState), }) } @@ -683,8 +715,17 @@ func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, statu } // instanceStatusFromProvisioningStateAndPowerState converts the VM provisioning state and power state to cloudprovider.InstanceStatus -func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { - if provisioningState == nil { +func (scaleSet *ScaleSet) instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { + // Prefer the proactive cache view of the instance state if we aren't in a terminal state + // This is because the power state may be taking longer to update and we don't want + // an unfortunate VM update (TTL 5 min) to reset that state to running. + if provisioningState == nil || *provisioningState == string(compute.GalleryProvisioningStateUpdating) { + providerID := azurePrefix + resourceId + for _, instance := range scaleSet.instanceCache { + if instance.Id == providerID { + return instance.Status + } + } return nil } @@ -717,6 +758,26 @@ func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisi status.State = cloudprovider.InstanceRunning } + // TODO + // now check power state + /*if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { + statuses := *vm.InstanceView.Statuses + + for _, s := range statuses { + state := to.String(s.Code) + // set the state to deallocated/deallocating based on their running state if provisioning is succeeded. + // This is to avoid the weird states with Failed VMs which can fail all API calls. + // This information is used to build instanceCache in CA. + if *vm.ProvisioningState == string(compute.GalleryProvisioningStateSucceeded) { + if powerStateDeallocated(state) { + status.State = cloudprovider.InstanceDeallocated + } else if powerStateDeallocating(state) { + status.State = cloudprovider.InstanceDeallocating + } + } + } + }*/ + return status } @@ -741,3 +802,159 @@ func (scaleSet *ScaleSet) getOrchestrationMode() (compute.OrchestrationMode, err } return vmss.OrchestrationMode, nil } + +// deallocateInstances deallocates the given instances. All instances must be controlled by the same nodegroup. +func (scaleSet *ScaleSet) deallocateInstances(instances []*azureRef) error { + if len(instances) == 0 { + return nil + } + + klog.V(3).Infof("Deallocating vmss instances %v", instances) + + commonNg, err := scaleSet.manager.GetNodeGroupForInstance(instances[0]) + if err != nil { + return err + } + + instancesToDeallocate := []*azureRef{} + for _, instance := range instances { + err = scaleSet.verifyNodeGroup(instance, commonNg.Id()) + if err != nil { + return err + } + + // Instances in the "InstanceDeallocated" state are the only ones currently being ignored. However, if the + // deallocation process fails for instances in the "InstanceDeallocating" state, we are currently invalidating + // the cache by calling "waitForDeallocateInstances()" without implementing proper error handling for these cases. + // Consequently, we do not intend to skip these instances. This approach is simply a conservative measure to + // ensure that all instances are accounted. + if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && + cpi.Status.ErrorInfo.ErrorCode == "InstanceDeallocated" { + klog.V(3).Infof("Skipping deleting instance %s as its current state is deallocated", instance.Name) + continue + } + instancesToDeallocate = append(instancesToDeallocate, instance) + } + + // nothing to delete + if len(instancesToDeallocate) == 0 { + klog.V(3).Infof("No new instances eligible for deallocation, skipping") + return nil + } + + instanceIDs := []string{} + for _, instance := range instancesToDeallocate { + instanceID, err := getLastSegment(instance.Name) + if err != nil { + klog.Errorf("getLastSegment failed with error: %v", err) + return err + } + instanceIDs = append(instanceIDs, instanceID) + } + + requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ + InstanceIds: &instanceIDs, + } + + ctx, cancel := getContextWithTimeout(vmssContextTimeout) + defer cancel() + resourceGroup := scaleSet.manager.config.ResourceGroup + + scaleSet.instanceMutex.Lock() + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.DeallocateInstancesAsync(%v) "+ + "for %s", requiredIds.InstanceIds, scaleSet.Name) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeallocateInstancesAsync(ctx, resourceGroup, + commonNg.Id(), *requiredIds) + scaleSet.instanceMutex.Unlock() + if rerr != nil { + klog.Errorf("virtualMachineScaleSetsClient.DeallocateInstancesAsync for instances %v for %s "+ + "failed: %+v", requiredIds.InstanceIds, scaleSet.Name, rerr) + return azureToAutoscalerError(rerr) + } + + // Proactively set the status of the instances to be running in cache as deallocating. Status will change to deallocated on success + for _, instance := range instancesToDeallocate { + state := cloudprovider.InstanceStatus{ + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorCode: "InstanceDeallocating", + }, + } + scaleSet.setInstanceStatusByProviderID(instance.Name, state) + } + + go scaleSet.waitForDeallocateInstances(future, instancesToDeallocate, requiredIds) + return nil +} + +func (scaleSet *ScaleSet) verifyNodeGroup(instance *azureRef, commonNgID string) error { + ng, err := scaleSet.manager.GetNodeGroupForInstance(instance) + if err != nil { + return err + } + + if !strings.EqualFold(ng.Id(), commonNgID) { + return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", + instance.Name, commonNgID) + } + return nil +} + +func (scaleSet *ScaleSet) waitForDeallocateInstances(future *azure.Future, instancesToDeallocate []*azureRef, + requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { + ctx, cancel := getContextWithTimeout(asyncContextTimeout) + defer cancel() + + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeallocateInstancesResult(%v) for %s", + requiredIds.InstanceIds, scaleSet.Name) + deallocateInstancesStart := time.Now() + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeallocateInstancesResult(ctx, + future, scaleSet.manager.config.ResourceGroup) + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + + properties := map[string]interface{}{"succeeded": isSuccess, "resourceGroupName": scaleSet.manager.config.ResourceGroup, + "resourceName": scaleSet.Name, "vmSKU": scaleSet.getSKU(), "crpOperationID": getCrpOperation(future)} + QOSEvent(deallocateInstancesStart, time.Now(), ToAutoscalerError(CloudProviderError, err), "waitForDeallocateInstances", + properties) + + if isSuccess { + klog.V(3).Infof("WaitForDeallocateInstancesResult(%v) for %s success", + requiredIds.InstanceIds, scaleSet.Name) + // Set the status of the instances to deallocated only if WaitForDeallocate Call Succeeds + for _, instance := range instancesToDeallocate { + state := cloudprovider.InstanceStatus{ + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorCode: "InstanceDeallocated", + }, + } + scaleSet.setInstanceStatusByProviderID(instance.Name, state) + } + return + } + + scaleSet.invalidateInstanceCache() + klog.Errorf("WaitForDeallocateInstancesResult(%v) for %s failed with error: %v", requiredIds.InstanceIds, scaleSet.Name, err) +} + +func (scaleSet *ScaleSet) getSKU() string { + vmssInfo, err := scaleSet.getVMSSFromCache() + if err != nil { + klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return "" + } + return to.String(vmssInfo.Sku.Name) +} + +func getCrpOperation(future *azure.Future) string { + if future != nil { + resp := future.Response() + defer resp.Body.Close() + header := resp.Header["X-Ms-Request-Id"] + if len(header) > 0 { + return header[0] + } + } + return "" +}