diff --git a/charts/cluster-autoscaler/README.md b/charts/cluster-autoscaler/README.md index f7aba27dc3ac..4402a23e348f 100644 --- a/charts/cluster-autoscaler/README.md +++ b/charts/cluster-autoscaler/README.md @@ -383,6 +383,7 @@ vpa: | azureClientID | string | `""` | Service Principal ClientID with contributor permission to Cluster and Node ResourceGroup. Required if `cloudProvider=azure` | | azureClientSecret | string | `""` | Service Principal ClientSecret with contributor permission to Cluster and Node ResourceGroup. Required if `cloudProvider=azure` | | azureResourceGroup | string | `""` | Azure resource group that the cluster is located. Required if `cloudProvider=azure` | +| azureScaleDownPolicy | string | `"Delete"` | Azure ScaleDownPolicy, either "Delete" (default) or "Deallocate" Only relevant if `cloudProvider=azure` | | azureSubscriptionID | string | `""` | Azure subscription where the resources are located. Required if `cloudProvider=azure` | | azureTenantID | string | `""` | Azure tenant where the resources are located. Required if `cloudProvider=azure` | | azureUseManagedIdentityExtension | bool | `false` | Whether to use Azure's managed identity extension for credentials. If using MSI, ensure subscription ID, resource group, and azure AKS cluster name are set. You can only use one authentication method at a time, either azureUseWorkloadIdentityExtension or azureUseManagedIdentityExtension should be set. | diff --git a/charts/cluster-autoscaler/templates/deployment.yaml b/charts/cluster-autoscaler/templates/deployment.yaml index ccbe4353edc8..53e326e60953 100644 --- a/charts/cluster-autoscaler/templates/deployment.yaml +++ b/charts/cluster-autoscaler/templates/deployment.yaml @@ -166,6 +166,11 @@ spec: secretKeyRef: key: VMType name: {{ default (include "cluster-autoscaler.fullname" .) .Values.secretKeyRefNameOverride }} + - name: AZURE_SCALE_DOWN_POLICY + valueFrom: + secretKeyRef: + key: ScaleDownPolicy + name: {{ default (include "cluster-autoscaler.fullname" .) .Values.secretKeyRefNameOverride }} {{- if .Values.azureUseWorkloadIdentityExtension }} - name: ARM_USE_WORKLOAD_IDENTITY_EXTENSION value: "true" diff --git a/charts/cluster-autoscaler/templates/secret.yaml b/charts/cluster-autoscaler/templates/secret.yaml index 4890230090c1..f4d6e5dab668 100644 --- a/charts/cluster-autoscaler/templates/secret.yaml +++ b/charts/cluster-autoscaler/templates/secret.yaml @@ -17,6 +17,7 @@ data: SubscriptionID: "{{ .Values.azureSubscriptionID | b64enc }}" TenantID: "{{ .Values.azureTenantID | b64enc }}" VMType: "{{ .Values.azureVMType | b64enc }}" + ScaleDownPolicy: "{{ .Values.azureScaleDownPolicy | b64enc }}" {{- else if $isAws }} AwsAccessKeyId: "{{ .Values.awsAccessKeyID | b64enc }}" AwsSecretAccessKey: "{{ .Values.awsSecretAccessKey | b64enc }}" diff --git a/charts/cluster-autoscaler/values.yaml b/charts/cluster-autoscaler/values.yaml index 0affb0242d31..7e579900f0ae 100644 --- a/charts/cluster-autoscaler/values.yaml +++ b/charts/cluster-autoscaler/values.yaml @@ -96,6 +96,10 @@ azureUseWorkloadIdentityExtension: false # azureVMType -- Azure VM type. azureVMType: "vmss" +# azureScaleDownPolicy -- Azure ScaleDownPolicy, either "Delete" (default) or "Deallocate" +# Only relevant if `cloudProvider=azure` +azureScaleDownPolicy: "Delete" + # cloudConfigPath -- Configuration file for cloud provider. cloudConfigPath: "" diff --git a/cluster-autoscaler/cloudprovider/azure/azure_config.go b/cluster-autoscaler/cloudprovider/azure/azure_config.go index 9fe559d07e14..3e36393f4d9c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_config.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_config.go @@ -135,6 +135,9 @@ type Config struct { // EnableVmssFlex defines whether to enable Vmss Flex support or not EnableVmssFlex bool `json:"enableVmssFlex,omitempty" yaml:"enableVmssFlex,omitempty"` + + // ScaleDownPolicy is the VMSS scale down policy, either "Delete" or "Deallocate" + ScaleDownPolicy string `json:"scaleDownPolicy" yaml:"scaleDownPolicy"` } // BuildAzureConfig returns a Config object for the Azure clients @@ -169,6 +172,7 @@ func BuildAzureConfig(configReader io.Reader) (*Config, error) { cfg.AADClientCertPath = os.Getenv("ARM_CLIENT_CERT_PATH") cfg.AADClientCertPassword = os.Getenv("ARM_CLIENT_CERT_PASSWORD") cfg.Deployment = os.Getenv("ARM_DEPLOYMENT") + cfg.ScaleDownPolicy = os.Getenv("AZURE_SCALE_DOWN_POLICY") subscriptionID, err := getSubscriptionIdFromInstanceMetadata() if err != nil { diff --git a/cluster-autoscaler/cloudprovider/azure/azure_error.go b/cluster-autoscaler/cloudprovider/azure/azure_error.go new file mode 100644 index 000000000000..7499b67444f0 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_error.go @@ -0,0 +1,204 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/Azure/go-autorest/autorest/azure" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" +) + +// Unknown is for errors that have nil RawError body +const Unknown CloudProviderErrorReason = "Unknown" + +// Errors on the sync path +const ( + // QuotaExceeded falls under OperationNotAllowed error code but we make it more specific here + QuotaExceeded CloudProviderErrorReason = "QuotaExceeded" + // OperationNotAllowed is an umbrella for a lot of errors returned by Azure + OperationNotAllowed string = "OperationNotAllowed" +) + +// AutoscalerErrorType describes a high-level category of a given error +type AutoscalerErrorType string + +// AutoscalerErrorReason is a more detailed reason for the failed operation +type AutoscalerErrorReason string + +// CloudProviderErrorReason providers more details on errors of type CloudProviderError +type CloudProviderErrorReason AutoscalerErrorReason + +// AutoscalerError contains information about Autoscaler errors +type AutoscalerError interface { + // Error implements golang error interface + Error() string + + // Type returns the type of AutoscalerError + Type() AutoscalerErrorType + + // Reason returns the reason of the AutoscalerError + Reason() AutoscalerErrorReason + + // AddPrefix adds a prefix to error message. + // Returns the error it's called for convenient inline use. + // Example: + // if err := DoSomething(myObject); err != nil { + // return err.AddPrefix("can't do something with %v: ", myObject) + // } + AddPrefix(msg string, args ...interface{}) AutoscalerError +} + +type autoscalerErrorImpl struct { + errorType AutoscalerErrorType + errorReason AutoscalerErrorReason + msg string +} + +const ( + // CloudProviderError is an error related to underlying infrastructure + CloudProviderError AutoscalerErrorType = "CloudProviderError" + // ApiCallError is an error related to communication with k8s API server + ApiCallError AutoscalerErrorType = "ApiCallError" + // Timeout is an error related to nodes not joining the cluster in maxNodeProvisionTime + Timeout AutoscalerErrorType = "Timeout" + // InternalError is an error inside Cluster Autoscaler + InternalError AutoscalerErrorType = "InternalError" + // TransientError is an error that causes us to skip a single loop, but + // does not require any additional action. + TransientError AutoscalerErrorType = "TransientError" + // ConfigurationError is an error related to bad configuration provided + // by a user. + ConfigurationError AutoscalerErrorType = "ConfigurationError" + // NodeGroupDoesNotExistError signifies that a NodeGroup + // does not exist. + NodeGroupDoesNotExistError AutoscalerErrorType = "nodeGroupDoesNotExistError" +) + +const ( + // NodeRegistration signifies an error with node registering + NodeRegistration AutoscalerErrorReason = "NodeRegistration" +) + +// NewAutoscalerError returns new autoscaler error with a message constructed from format string +func NewAutoscalerError(errorType AutoscalerErrorType, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: errorType, + msg: fmt.Sprintf(msg, args...), + } +} + +// NewAutoscalerErrorWithReason returns new autoscaler error with a reason and a message constructed from format string +func NewAutoscalerErrorWithReason(errorType AutoscalerErrorType, reason AutoscalerErrorReason, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: errorType, + errorReason: reason, + msg: fmt.Sprintf(msg, args...), + } +} + +// NewAutoscalerCloudProviderError returns new autoscaler error with a cloudprovider error type and a message constructed from format string +func NewAutoscalerCloudProviderError(errorReason CloudProviderErrorReason, msg string, args ...interface{}) AutoscalerError { + return autoscalerErrorImpl{ + errorType: CloudProviderError, + errorReason: AutoscalerErrorReason(errorReason), + msg: fmt.Sprintf(msg, args...), + } +} + +// ToAutoscalerError converts an error to AutoscalerError with given type, +// unless it already is an AutoscalerError (in which case it's not modified). +func ToAutoscalerError(defaultType AutoscalerErrorType, err error) AutoscalerError { + if err == nil { + return nil + } + if e, ok := err.(AutoscalerError); ok { + return e + } + return NewAutoscalerError(defaultType, "%v", err) +} + +// Error implements golang error interface +func (e autoscalerErrorImpl) Error() string { + return e.msg +} + +// Type returns the type of AutoscalerError +func (e autoscalerErrorImpl) Type() AutoscalerErrorType { + return e.errorType +} + +func (e autoscalerErrorImpl) Reason() AutoscalerErrorReason { + return e.errorReason +} + +// AddPrefix adds a prefix to error message. +// Returns the error it's called for convenient inline use. +// Example: +// if err := DoSomething(myObject); err != nil { +// +// return err.AddPrefix("can't do something with %v: ", myObject) +// +// } +func (e autoscalerErrorImpl) AddPrefix(msg string, args ...interface{}) AutoscalerError { + e.msg = fmt.Sprintf(msg, args...) + e.msg + return e +} + +// ServiceRawError wraps the RawError returned by the k8s/cloudprovider +// Azure clients. The error body should satisfy the autorest.ServiceError type +type ServiceRawError struct { + ServiceError *azure.ServiceError `json:"error,omitempty"` +} + +func azureToAutoscalerError(rerr *retry.Error) AutoscalerError { + if rerr == nil { + return nil + } + if rerr.RawError == nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + + re := ServiceRawError{} + err := json.Unmarshal([]byte(rerr.RawError.Error()), &re) + if err != nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + se := re.ServiceError + if se == nil { + return NewAutoscalerCloudProviderError(Unknown, rerr.Error().Error()) + } + var errCode CloudProviderErrorReason + if se.Code == "" { + errCode = Unknown + } else if se.Code == OperationNotAllowed { + errCode = getOperationNotAllowedReason(se) + } else { + errCode = CloudProviderErrorReason(se.Code) + } + return NewAutoscalerCloudProviderError(errCode, se.Message) +} + +// getOperationNotAllowedReason renames the error code for quotas to a more human-readable error +func getOperationNotAllowedReason(se *azure.ServiceError) CloudProviderErrorReason { + if strings.Contains(se.Message, "Quota increase") { + return QuotaExceeded + } + return CloudProviderErrorReason(OperationNotAllowed) +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_logger.go b/cluster-autoscaler/cloudprovider/azure/azure_logger.go new file mode 100644 index 000000000000..f074042a123a --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_logger.go @@ -0,0 +1,109 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "runtime" + "time" + + "crypto/rand" + + "github.com/sirupsen/logrus" +) + +// Logger is a type that can log QOSEventInfo +type Logger struct { + QOSLogger *logrus.Entry + EnableQOSLogging bool +} + +// QOSEventInfo is a type used to store qosEvents' info when logging is delayed +type QOSEventInfo struct { + Start time.Time + End time.Time + Properties map[string]interface{} +} + +var ( + logger *Logger +) + +const ( + // SourceFieldName source + SourceFieldName = "source" + // ClusterAutoscalerQosLog source field name for the QosLogger + ClusterAutoscalerQosLog = "ClusterAutoscalerQosLog" + epochFieldName = "env_epoch" + fileNameFieldName = "fileName" + lineNumberFieldName = "lineNumber" + methodNameFieldName = "methodName" + durationInMillisecondsFieldName = "durationInMilliseconds" + resultFieldName = "result" + errorDetailsFieldName = "errorDetails" + errorTypeFieldName = "errorType" + errorReasonFieldName = "errorReason" + startTimeFieldName = "startTime" + endTimeFieldName = "endTime" + upperCaseAlphanumeric = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" +) + +// NewLogger makes a new Logger that can log qos events +func NewLogger(enableQOSLogging bool) { + entryLogger := logrus.New() + entryLogger.Formatter = &logrus.JSONFormatter{} + log := logrus.NewEntry(entryLogger) + epoch, _ := getEpochRandomString() + log.WithField(epochFieldName, epoch) + log = withCallerInfo(log) + logger = &Logger{ + log.WithField(SourceFieldName, ClusterAutoscalerQosLog), + enableQOSLogging, + } +} + +func withCallerInfo(logger *logrus.Entry) *logrus.Entry { + _, file, line, _ := runtime.Caller(3) + fields := make(map[string]interface{}) + fields[fileNameFieldName] = file + fields[lineNumberFieldName] = line + return logger.WithFields(fields) +} + +// getAutoscalerErrorInfo formats the passed in AutoscalerError to be logged in QosEvent +func getAutoscalerErrorInfo(autoscalerErrors AutoscalerError) (map[string]interface{}, string) { + autoscalerErrorsMap := map[string]interface{}{errorDetailsFieldName: nil, errorTypeFieldName: nil, errorReasonFieldName: nil} + result := "Succeeded" + if autoscalerErrors != nil { + autoscalerErrorsMap = map[string]interface{}{errorDetailsFieldName: autoscalerErrors.Error(), errorTypeFieldName: autoscalerErrors.Type(), errorReasonFieldName: autoscalerErrors.Reason()} + result = "Failed" + } + return autoscalerErrorsMap, result +} + +// getEpochRandomString generates a random string with the provided length using the given alphabet +func getEpochRandomString() (string, error) { + randomBytes := make([]byte, 5) + _, err := rand.Read(randomBytes) + if err != nil { + return "", err + } + for index, randomByte := range randomBytes { + foldedOffset := randomByte % byte(len(upperCaseAlphanumeric)) + randomBytes[index] = upperCaseAlphanumeric[foldedOffset] + } + return string(randomBytes), nil +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager.go b/cluster-autoscaler/cloudprovider/azure/azure_manager.go index f2fc79b1ca9c..237df6329fbe 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager.go @@ -33,6 +33,8 @@ import ( ) const ( + azurePrefix = "azure://" + vmTypeVMSS = "vmss" vmTypeStandard = "standard" diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index 674a06cc8a72..c2f4b5721c4c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -682,6 +682,7 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { curSize: 3, sizeRefreshPeriod: manager.azureCache.refreshInterval, instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, + scaleDownPolicy: ScaleDownPolicyDelete, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index f0b2e83f07e8..3321da976515 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -33,11 +33,13 @@ import ( "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" "github.com/Azure/go-autorest/autorest/azure" + "github.com/Azure/go-autorest/autorest/to" ) var ( defaultVmssInstancesRefreshPeriod = 5 * time.Minute vmssContextTimeout = 3 * time.Minute + asyncContextTimeout = 30 * time.Minute vmssSizeMutex sync.Mutex ) @@ -50,6 +52,17 @@ const ( provisioningStateUpdating string = "Updating" ) +// ScaleDownPolicy is a string representation of the ScaleDownPolicy +type ScaleDownPolicy string + +const ( + // ScaleDownPolicyDelete means that on scale down, nodes will be deleted + ScaleDownPolicyDelete ScaleDownPolicy = "Delete" + // ScaleDownPolicyDeallocate means that on scale down, nodes will be deallocated and on scale-up they will + // be attempted to be started first + ScaleDownPolicyDeallocate ScaleDownPolicy = "Deallocate" +) + // ScaleSet implements NodeGroup interface. type ScaleSet struct { azureRef @@ -72,6 +85,8 @@ type ScaleSet struct { instanceMutex sync.Mutex instanceCache []cloudprovider.Instance lastInstanceRefresh time.Time + + scaleDownPolicy ScaleDownPolicy } // NewScaleSet creates a new NewScaleSet. @@ -95,6 +110,12 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) ( scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod } + // Default to "Delete" VMSS scale down policy + scaleSet.scaleDownPolicy = ScaleDownPolicyDelete + if az.config.ScaleDownPolicy != "" { + scaleSet.scaleDownPolicy = ScaleDownPolicy(az.config.ScaleDownPolicy) + } + return scaleSet, nil } @@ -183,7 +204,27 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { // GetScaleSetSize gets Scale Set size. func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { - return scaleSet.getCurSize() + // First, get the size of the ScaleSet reported by API + // -1 indiciates the ScaleSet hasn't been initialized + size, err := scaleSet.getCurSize() + if size == -1 || err != nil { + klog.V(3).Infof("getScaleSetSize: either size is -1 (actual: %d) or error exists (actual err:%v)", size, err) + return size, err + } + // If the policy for this ScaleSet is Deallocate, the TargetSize is the capacity reported by VMSS minus the nodes + // in deallocated and deallocating states + if scaleSet.scaleDownPolicy == ScaleDownPolicyDeallocate { + totalDeallocationInstances, err := scaleSet.countDeallocatedInstances() + if err != nil { + klog.Errorf("getScaleSetSize: error countDeallocatedInstances for scaleSet %s,"+ + "err: %v", scaleSet.Name, err) + return -1, err + } + size -= int64(totalDeallocationInstances) + klog.V(3).Infof("Found: %d instances in deallocated state, returning target size: %d for scaleSet %s", + totalDeallocationInstances, size, scaleSet.Name) + } + return size, nil } func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { @@ -229,20 +270,17 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(future *azure.Future) { klog.Errorf("virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult - updateVMSSCapacity for scale set %q failed: %v", scaleSet.Name, err) } -// SetScaleSetSize sets ScaleSet size. -func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { +func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachineScaleSet, newSize int64) error { + if vmssInfo == nil { + return fmt.Errorf("vmssInfo cannot be nil while increating scaleSet capacity") + } + scaleSet.sizeMutex.Lock() defer scaleSet.sizeMutex.Unlock() - vmssInfo, err := scaleSet.getVMSSFromCache() - if err != nil { - klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) - return err - } - // Update the new capacity to cache. vmssSizeMutex.Lock() - vmssInfo.Sku.Capacity = &size + vmssInfo.Sku.Capacity = &newSize vmssSizeMutex.Unlock() // Compose a new VMSS for updating. @@ -251,23 +289,215 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { Sku: vmssInfo.Sku, Location: vmssInfo.Location, } + + if vmssInfo.ExtendedLocation != nil { + op.ExtendedLocation = &compute.ExtendedLocation{ + Name: vmssInfo.ExtendedLocation.Name, + Type: vmssInfo.ExtendedLocation.Type, + } + + klog.V(3).Infof("Passing ExtendedLocation information if it is not nil, with Edge Zone name:(%s)", *op.ExtendedLocation.Name) + } + ctx, cancel := getContextWithTimeout(vmssContextTimeout) defer cancel() klog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdateAsync(%s)", scaleSet.Name) - future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name, op) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, + scaleSet.manager.config.ResourceGroup, scaleSet.Name, op) if rerr != nil { - klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, rerr) - return rerr.Error() + klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %+v", scaleSet.Name, rerr) + return azureToAutoscalerError(rerr) } // Proactively set the VMSS size so autoscaler makes better decisions. - scaleSet.curSize = size + scaleSet.curSize = newSize scaleSet.lastSizeRefresh = time.Now() - go scaleSet.updateVMSSCapacity(future) + go scaleSet.waitForCreateOrUpdateInstances(future) + return nil +} + +// waitForCreateOrUpdate waits for the outcome of VMSS capacity update initiated via CreateOrUpdateAsync. +func (scaleSet *ScaleSet) waitForCreateOrUpdateInstances(future *azure.Future) { + var err error + + defer func() { + // Invalidate instanceCache on success and failure. Failure might have created a few instances, but it is very rare. + scaleSet.invalidateInstanceCache() + if err != nil { + klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get "+ + "the real size from API", scaleSet.Name, err) + // Invalidate the VMSS size cache in order to fetch the size from the API. + scaleSet.invalidateLastSizeRefreshWithLock() + scaleSet.manager.invalidateCache() + } + }() + + ctx, cancel := getContextWithTimeout(asyncContextTimeout) + defer cancel() + + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(%s)", scaleSet.Name) + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(ctx, + future, scaleSet.manager.config.ResourceGroup) + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + + if isSuccess { + klog.V(3).Infof("waitForCreateOrUpdateInstances(%s) success", scaleSet.Name) + return + } + + klog.Errorf("waitForCreateOrUpdateInstances(%s) failed, err: %v", scaleSet.Name, err) +} + +// SetScaleSetSize sets ScaleSet size. +func (scaleSet *ScaleSet) SetScaleSetSize(size int64, delta int) error { + vmssInfo, err := scaleSet.getVMSSFromCache() + if err != nil { + klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return err + } + + requiredInstances := delta + // If the policy is deallocate, then attempt to satisfy the request by starting existing instances + if scaleSet.scaleDownPolicy == ScaleDownPolicyDeallocate { + deallocatedInstances, err := scaleSet.getDeallocatedInstances() + if err != nil { + klog.Errorf("SetScaleSetSize: error getDeallocatedInstances for scaleSet %s, "+ + "err : %v", scaleSet.Name, err) + } else { + klog.V(3).Infof("Attempting to start: %d instances from deallocated state", requiredInstances) + + // Go through current instances and attempt to reallocate them + for _, instance := range deallocatedInstances { + // we're done + if requiredInstances <= 0 { + break + } + instancesToStart := []*azureRef{{Name: instance.Id}} + err := scaleSet.startInstances(instancesToStart) + if err != nil { + klog.Errorf("Failed to start instances %v in scale set %q: %v", instancesToStart, scaleSet.Name, err) + continue + } + klog.V(3).Infof("Successfully started instances %v in scale set %q", instancesToStart, scaleSet.Name) + requiredInstances-- + } + } + } + + // If after reallocating instances we still need more instances or we're just in Delete mode + // send a scale request + if requiredInstances > 0 { + klog.V(3).Infof("Remaining unsatisfied count is %d. Attempting to increase scale set %q "+ + "capacity", requiredInstances, scaleSet.Name) + err := scaleSet.createOrUpdateInstances(&vmssInfo, size) + if err != nil { + klog.Errorf("Failed to increase capacity for scale set %q to %d: %v", scaleSet.Name, requiredInstances, err) + return err + } + } return nil } +// startInstances starts the given instances. All instances must be controlled by the same nodegroup. +func (scaleSet *ScaleSet) startInstances(instances []*azureRef) error { + if len(instances) == 0 { + return nil + } + + klog.V(3).Infof("Starting vmss instances %v", instances) + + commonNg, err := scaleSet.manager.GetNodeGroupForInstance(instances[0]) + if err != nil { + return err + } + + instancesToStart := []*azureRef{} + for _, instance := range instances { + err = scaleSet.verifyNodeGroup(instance, commonNg.Id()) + if err != nil { + return err + } + + if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil && + cpi.Status.State == cloudprovider.InstanceRunning { + klog.V(3).Infof("Skipping deleting instance %s as its current state is running", instance.Name) + continue + } + instancesToStart = append(instancesToStart, instance) + } + + // nothing to delete + if len(instancesToStart) == 0 { + klog.V(3).Infof("No new instances eligible for starting, skipping") + return nil + } + + instanceIDs := []string{} + for _, instance := range instancesToStart { + instanceID, err := getLastSegment(instance.Name) + if err != nil { + klog.Errorf("getLastSegment failed with error: %v", err) + return err + } + instanceIDs = append(instanceIDs, instanceID) + } + + requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ + InstanceIds: &instanceIDs, + } + + ctx, cancel := getContextWithTimeout(vmssContextTimeout) + defer cancel() + resourceGroup := scaleSet.manager.config.ResourceGroup + + scaleSet.instanceMutex.Lock() + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.StartInstancesAsync(%v) for %s", + requiredIds.InstanceIds, scaleSet.Name) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.StartInstancesAsync(ctx, resourceGroup, + commonNg.Id(), *requiredIds) + scaleSet.instanceMutex.Unlock() + if rerr != nil { + klog.Errorf("virtualMachineScaleSetsClient.StartInstancesAsync for instances %v for %s failed: "+ + "%+v", requiredIds.InstanceIds, scaleSet.Name, rerr) + return rerr.Error() + } + + // Proactively set the status of the instances to be running in cache + for _, instance := range instancesToStart { + scaleSet.setInstanceStatusByProviderID(instance.Name, cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}) + } + + go scaleSet.waitForStartInstances(future, requiredIds) + return nil +} + +func (scaleSet *ScaleSet) waitForStartInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { + ctx, cancel := getContextWithCancel() + defer cancel() + + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForStartInstancesResult(%v) for %s", requiredIds.InstanceIds, scaleSet.Name) + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForStartInstancesResult(ctx, future, + scaleSet.manager.config.ResourceGroup) + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + + if isSuccess { + klog.V(3).Infof("WaitForStartInstancesResult(%v) for %s success", requiredIds.InstanceIds, scaleSet.Name) + // No need to invalidateInstanceCache because the states were proactively set to Running. + return + } + + scaleSet.invalidateInstanceCache() + klog.Errorf("WaitForStartInstancesResult(%v) for %s failed with error: %v", + requiredIds.InstanceIds, scaleSet.Name, err) +} + // TargetSize returns the current TARGET size of the node group. It is possible that the // number is different from the number of nodes registered in Kubernetes. func (scaleSet *ScaleSet) TargetSize() (int, error) { @@ -282,6 +512,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { } size, err := scaleSet.GetScaleSetSize() + klog.Infof("Scaleset %s has size %d, will increase by %d", scaleSet.Name, size, delta) if err != nil { return err } @@ -294,7 +525,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, scaleSet.MaxSize()) } - return scaleSet.SetScaleSetSize(size + int64(delta)) + return scaleSet.SetScaleSetSize(size+int64(delta), delta) } // GetScaleSetVms returns list of nodes for the given scale set. @@ -400,7 +631,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg) } - if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting { + if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting { klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name) continue } @@ -462,7 +693,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered // DeleteNodes deletes the nodes from the group. func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { - klog.V(8).Infof("Delete nodes requested: %q\n", nodes) + klog.V(4).Infof("Delete nodes using scaleDownPolicy=%s requested: %q\n", scaleSet.scaleDownPolicy, nodes) size, err := scaleSet.GetScaleSetSize() if err != nil { return err @@ -493,6 +724,9 @@ func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { refs = append(refs, ref) } + if scaleSet.scaleDownPolicy == ScaleDownPolicyDeallocate { + return scaleSet.deallocateInstances(refs) + } return scaleSet.DeleteInstances(refs, hasUnregisteredNodes) } @@ -578,18 +812,26 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) { } func (scaleSet *ScaleSet) buildScaleSetCache(lastRefresh time.Time) error { + klog.V(3).Infof("buildScaleSetCache: resetting instance Cache for scaleSet %s", + scaleSet.Name) vms, rerr := scaleSet.GetScaleSetVms() if rerr != nil { if isAzureRequestsThrottled(rerr) { - // Log a warning and update the instance refresh time so that it would retry after cache expiration - klog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + // Log a warning and update the instance refresh time so that it would retry later. + // Ensure to retry no sooner than rerr.RetryAfter + klog.Warningf("buildScaleSetCache: GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + nextRefresh := lastRefresh.Add(scaleSet.instancesRefreshPeriod) + if nextRefresh.Before(rerr.RetryAfter) { + delay := rerr.RetryAfter.Sub(nextRefresh) + lastRefresh = lastRefresh.Add(delay) + } scaleSet.lastInstanceRefresh = lastRefresh return nil } return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + scaleSet.instanceCache = scaleSet.buildInstanceCache(vms) scaleSet.lastInstanceRefresh = lastRefresh return nil @@ -607,7 +849,7 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + scaleSet.instanceCache = scaleSet.buildInstanceCache(vms) scaleSet.lastInstanceRefresh = lastRefresh return nil @@ -615,7 +857,7 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error // Note that the GetScaleSetVms() results is not used directly because for the List endpoint, // their resource ID format is not consistent with Get endpoint -func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { +func (scaleSet *ScaleSet) buildInstanceCache(vmList interface{}) []cloudprovider.Instance { instances := []cloudprovider.Instance{} switch vms := vmList.(type) { @@ -625,7 +867,7 @@ func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) + scaleSet.addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) } case []compute.VirtualMachine: for _, vm := range vms { @@ -633,14 +875,14 @@ func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) + scaleSet.addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) } } return instances } -func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) { +func (scaleSet *ScaleSet) addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) { // The resource ID is empty string, which indicates the instance may be in deleting state. if len(*id) == 0 { return @@ -655,19 +897,27 @@ func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisi *instances = append(*instances, cloudprovider.Instance{ Id: "azure://" + resourceID, - Status: instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState), + Status: scaleSet.instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState), }) } -func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) { +func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool, error) { scaleSet.instanceMutex.Lock() defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("getInstanceByProviderID: error validating instanceCache for providerID %s for scaleSet %s, "+ + "err: %v", providerID, scaleSet.Name, err) + return cloudprovider.Instance{}, false, err + } + for _, instance := range scaleSet.instanceCache { if instance.Id == providerID { - return instance, true + return instance, true, nil } } - return cloudprovider.Instance{}, false + return cloudprovider.Instance{}, false, nil } func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, status cloudprovider.InstanceStatus) { @@ -683,8 +933,17 @@ func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, statu } // instanceStatusFromProvisioningStateAndPowerState converts the VM provisioning state and power state to cloudprovider.InstanceStatus -func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { - if provisioningState == nil { +func (scaleSet *ScaleSet) instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { + // Prefer the proactive cache view of the instance state if we aren't in a terminal state + // This is because the power state may be taking longer to update and we don't want + // an unfortunate VM update (TTL 5 min) to reset that state to running. + if provisioningState == nil || *provisioningState == string(compute.GalleryProvisioningStateUpdating) { + providerID := azurePrefix + resourceId + for _, instance := range scaleSet.instanceCache { + if instance.Id == providerID { + return instance.Status + } + } return nil } @@ -713,10 +972,37 @@ func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisi klog.V(5).Infof("VM %s reports a failed provisioning state but is running (%s)", resourceId, powerState) status.State = cloudprovider.InstanceRunning } + case provisioningStateSucceeded: + if powerState == vmPowerStateDeallocated { + status.State = cloudprovider.InstanceDeleting + status.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorCode: "InstanceDeallocated", + } + } default: status.State = cloudprovider.InstanceRunning } + // TODO + // now check power state + /*if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { + statuses := *vm.InstanceView.Statuses + + for _, s := range statuses { + state := to.String(s.Code) + // set the state to deallocated/deallocating based on their running state if provisioning is succeeded. + // This is to avoid the weird states with Failed VMs which can fail all API calls. + // This information is used to build instanceCache in CA. + if *vm.ProvisioningState == string(compute.GalleryProvisioningStateSucceeded) { + if powerStateDeallocated(state) { + status.State = cloudprovider.InstanceDeallocated + } else if powerStateDeallocating(state) { + status.State = cloudprovider.InstanceDeallocating + } + } + } + }*/ + return status } @@ -741,3 +1027,207 @@ func (scaleSet *ScaleSet) getOrchestrationMode() (compute.OrchestrationMode, err } return vmss.OrchestrationMode, nil } + +// deallocateInstances deallocates the given instances. All instances must be controlled by the same nodegroup. +func (scaleSet *ScaleSet) deallocateInstances(instances []*azureRef) error { + if len(instances) == 0 { + return nil + } + + klog.V(3).Infof("Deallocating vmss instances %v", instances) + + commonNg, err := scaleSet.manager.GetNodeGroupForInstance(instances[0]) + if err != nil { + return err + } + + instancesToDeallocate := []*azureRef{} + for _, instance := range instances { + err = scaleSet.verifyNodeGroup(instance, commonNg.Id()) + if err != nil { + return err + } + + // Instances in the "InstanceDeallocated" state are the only ones currently being ignored. However, if the + // deallocation process fails for instances in the "InstanceDeallocating" state, we are currently invalidating + // the cache by calling "waitForDeallocateInstances()" without implementing proper error handling for these cases. + // Consequently, we do not intend to skip these instances. This approach is simply a conservative measure to + // ensure that all instances are accounted. + if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil && + cpi.Status.ErrorInfo != nil && cpi.Status.ErrorInfo.ErrorCode == "InstanceDeallocated" { + klog.V(3).Infof("Skipping deleting instance %s as its current state is deallocated", instance.Name) + continue + } + instancesToDeallocate = append(instancesToDeallocate, instance) + } + + // nothing to delete + if len(instancesToDeallocate) == 0 { + klog.V(3).Infof("No new instances eligible for deallocation, skipping") + return nil + } + + instanceIDs := []string{} + for _, instance := range instancesToDeallocate { + instanceID, err := getLastSegment(instance.Name) + if err != nil { + klog.Errorf("getLastSegment failed with error: %v", err) + return err + } + instanceIDs = append(instanceIDs, instanceID) + } + + requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ + InstanceIds: &instanceIDs, + } + + ctx, cancel := getContextWithTimeout(vmssContextTimeout) + defer cancel() + resourceGroup := scaleSet.manager.config.ResourceGroup + + scaleSet.instanceMutex.Lock() + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.DeallocateInstancesAsync(%v) "+ + "for %s", requiredIds.InstanceIds, scaleSet.Name) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeallocateInstancesAsync(ctx, resourceGroup, + commonNg.Id(), *requiredIds) + scaleSet.instanceMutex.Unlock() + if rerr != nil { + klog.Errorf("virtualMachineScaleSetsClient.DeallocateInstancesAsync for instances %v for %s "+ + "failed: %+v", requiredIds.InstanceIds, scaleSet.Name, rerr) + return azureToAutoscalerError(rerr) + } + + // Proactively set the status of the instances to be running in cache as deallocating. Status will change to deallocated on success + for _, instance := range instancesToDeallocate { + state := cloudprovider.InstanceStatus{ + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorCode: "InstanceDeallocating", + }, + } + scaleSet.setInstanceStatusByProviderID(instance.Name, state) + } + + go scaleSet.waitForDeallocateInstances(future, instancesToDeallocate, requiredIds) + return nil +} + +func (scaleSet *ScaleSet) verifyNodeGroup(instance *azureRef, commonNgID string) error { + ng, err := scaleSet.manager.GetNodeGroupForInstance(instance) + if err != nil { + return err + } + + if !strings.EqualFold(ng.Id(), commonNgID) { + return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", + instance.Name, commonNgID) + } + return nil +} + +func (scaleSet *ScaleSet) waitForDeallocateInstances(future *azure.Future, instancesToDeallocate []*azureRef, + requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { + ctx, cancel := getContextWithTimeout(asyncContextTimeout) + defer cancel() + + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeallocateInstancesResult(%v) for %s", + requiredIds.InstanceIds, scaleSet.Name) + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeallocateInstancesResult(ctx, + future, scaleSet.manager.config.ResourceGroup) + if httpResponse != nil && httpResponse.Body != nil { + defer httpResponse.Body.Close() + } + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + + if isSuccess { + klog.V(3).Infof("WaitForDeallocateInstancesResult(%v) for %s success", + requiredIds.InstanceIds, scaleSet.Name) + // Set the status of the instances to deallocated only if WaitForDeallocate Call Succeeds + for _, instance := range instancesToDeallocate { + state := cloudprovider.InstanceStatus{ + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorCode: "InstanceDeallocated", + }, + } + scaleSet.setInstanceStatusByProviderID(instance.Name, state) + } + return + } + + scaleSet.invalidateInstanceCache() + klog.Errorf("WaitForDeallocateInstancesResult(%v) for %s failed with error: %v", requiredIds.InstanceIds, scaleSet.Name, err) +} + +// validateInstanceCacheWithoutLock is used a helper function for validateInstanceCache, get and set methods. +func (scaleSet *ScaleSet) validateInstanceCacheWithoutLock() error { + if scaleSet.lastInstanceRefresh.Add(scaleSet.instancesRefreshPeriod).After(time.Now()) { + klog.V(3).Infof("validateInstanceCacheWithoutLock: no need to reset instance Cache for scaleSet %s", + scaleSet.Name) + return nil + } + + splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1) + lastRefresh := time.Now().Add(-time.Second * time.Duration(splay)) + return scaleSet.buildScaleSetCache(lastRefresh) +} + +// getDeallocatedInstances returns list of deallocated instances. +func (scaleSet *ScaleSet) getDeallocatedInstances() ([]cloudprovider.Instance, error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("getDeallocatedInstances: error validating instanceCache for deallocated instances for scaleSet %s, "+ + "err: %v", scaleSet.Name, err) + return []cloudprovider.Instance{}, err + } + + instances := []cloudprovider.Instance{} + for _, instance := range scaleSet.instanceCache { + if instance.Status != nil && instance.Status.ErrorInfo != nil && instance.Status.ErrorInfo.ErrorCode == "InstanceDeallocated" { + instances = append(instances, instance) + } + } + return instances, nil +} + +func (scaleSet *ScaleSet) countDeallocatedInstances() (int, error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("countDeallocatedInstances: error validating instanceCache for scaleSet %s, "+ + "err: %v", scaleSet.Name, err) + return 0, err + } + + var numInstances int + for _, instance := range scaleSet.instanceCache { + if instance.Status != nil && instance.Status.ErrorInfo != nil && instance.Status.ErrorInfo.ErrorCode == "InstanceDeallocated" { + numInstances++ + } + } + return numInstances, nil +} + +func (scaleSet *ScaleSet) getSKU() string { + vmssInfo, err := scaleSet.getVMSSFromCache() + if err != nil { + klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return "" + } + return to.String(vmssInfo.Sku.Name) +} + +func getCrpOperation(future *azure.Future) string { + if future != nil { + resp := future.Response() + defer resp.Body.Close() + header := resp.Header["X-Ms-Request-Id"] + if len(header) > 0 { + return header[0] + } + } + return "" +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index cd7c7d56b772..70ae538d224e 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -490,11 +490,13 @@ func TestDeleteNodes(t *testing.T) { assert.Equal(t, 1, targetSize) // Ensure that the status for the instances is Deleting - instance0, found := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + instance0, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + assert.NoError(t, err) assert.True(t, found, true) assert.Equal(t, instance0.Status.State, cloudprovider.InstanceDeleting) - instance2, found := scaleSet.getInstanceByProviderID(nodesToDelete[1].Spec.ProviderID) + instance2, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[1].Spec.ProviderID) + assert.NoError(t, err) assert.True(t, found, true) assert.Equal(t, instance2.Status.State, cloudprovider.InstanceDeleting) @@ -575,7 +577,8 @@ func TestDeleteNodeUnregistered(t *testing.T) { assert.Equal(t, 2, targetSize) // Ensure that the status for the instances is Deleting - instance0, found := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + instance0, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + assert.NoError(t, err) assert.True(t, found, true) assert.Equal(t, instance0.Status.State, cloudprovider.InstanceDeleting) } diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml index c9c40f6d6df7..4abbc623592d 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-aks.yaml @@ -200,4 +200,9 @@ spec: secretKeyRef: key: VMType name: cluster-autoscaler-azure + - name: AZURE_SCALE_DOWN_POLICY + valueFrom: + secretKeyRef: + key: ScaleDownPolicy + name: cluster-autoscaler-azure restartPolicy: Always diff --git a/cluster-autoscaler/cloudprovider/cloud_provider.go b/cluster-autoscaler/cloudprovider/cloud_provider.go index 5da8546b3fad..9eb0e9d4e7a3 100644 --- a/cluster-autoscaler/cloudprovider/cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/cloud_provider.go @@ -236,6 +236,23 @@ type NodeGroup interface { GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) } +// PolicyNodeGroup is a wrapper for a nodegroup that can have scaling policies +type PolicyNodeGroup interface { + // ScaleDownPolicy returns whether this nodegroup instances will be deleted or hibernated on scale down + ScaleDownPolicy() ScaleDownPolicy +} + +// ScaleDownPolicy is a string representation of the ScaleDownPolicy +type ScaleDownPolicy string + +const ( + // Delete means that on scale down, nodes will be deleted + Delete ScaleDownPolicy = "Delete" + // Hibernate means that on scale down, node infrastructure will be stopped + // and on scale-up they will be prioritized and restarted + Hibernate ScaleDownPolicy = "Hibernate" +) + // Instance represents a cloud-provider node. The node does not necessarily map to k8s node // i.e it does not have to be registered in k8s cluster despite being returned by NodeGroup.Nodes() // method. Also it is sane to have Instance object for nodes which are being created or deleted. @@ -344,3 +361,24 @@ func ContainsCustomResources(resources []string) bool { } return false } + +func IsNodeGroupHibernateEnabled(ng NodeGroup) bool { + if policyNg, ok := ng.(PolicyNodeGroup); ok && policyNg.ScaleDownPolicy() == Hibernate { + return true + } + return false +} + +// HasAnyHibernateEnabledNodeGroup returns true iff any nodegroup has a ScaleDown policy of Hibernate +func HasAnyHibernateEnabledNodeGroup(ngs []NodeGroup) bool { + for _, ng := range ngs { + policyNg, ok := ng.(PolicyNodeGroup) + if !ok { + return false + } + if policyNg.ScaleDownPolicy() == Hibernate { + return true + } + } + return false +} diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index 2240d3f22883..1aebf23b1f3a 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -403,9 +403,18 @@ func (csr *ClusterStateRegistry) IsClusterHealthy() bool { defer csr.Unlock() totalUnready := len(csr.totalReadiness.Unready) + // initialize nonHibernatingUnready to totalUnready + nonHibernatingUnready := totalUnready - if totalUnready > csr.config.OkTotalUnreadyCount && - float64(totalUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) { + // if any of our node groups is enabled for Hibernate, we recalculate nonHibernatingUnready + if cloudprovider.HasAnyHibernateEnabledNodeGroup(csr.cloudProvider.NodeGroups()) { + hibernatingNodes := len(csr.totalReadiness.Hibernating) + nonHibernatingUnready = hibernatingNodes - totalUnready + klog.V(5).Infof("IsClusterHealthy - totalUnready: %d, hibernatingNodes: %d", totalUnready, hibernatingNodes) + } + + if nonHibernatingUnready > csr.config.OkTotalUnreadyCount && + float64(nonHibernatingUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) { return false } @@ -413,7 +422,8 @@ func (csr *ClusterStateRegistry) IsClusterHealthy() bool { } // IsNodeGroupHealthy returns true if the node group health is within the acceptable limits -func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool { +func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroup cloudprovider.NodeGroup) bool { + nodeGroupName := nodeGroup.Id() acceptable, found := csr.acceptableRanges[nodeGroupName] if !found { klog.Warningf("Failed to find acceptable ranges for %v", nodeGroupName) @@ -435,6 +445,9 @@ func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool { if len(readiness.Ready) < acceptable.MinNodes { unjustifiedUnready += acceptable.MinNodes - len(readiness.Ready) } + if cloudprovider.IsNodeGroupHibernateEnabled(nodeGroup) { + unjustifiedUnready -= len(readiness.Hibernating) + } // TODO: verify against max nodes as well. if unjustifiedUnready > csr.config.OkTotalUnreadyCount && float64(unjustifiedUnready) > csr.config.MaxTotalUnreadyPercentage/100.0* @@ -469,7 +482,7 @@ func (csr *ClusterStateRegistry) BackoffStatusForNodeGroup(nodeGroup cloudprovid // NodeGroupScaleUpSafety returns information about node group safety to be scaled up now. func (csr *ClusterStateRegistry) NodeGroupScaleUpSafety(nodeGroup cloudprovider.NodeGroup, now time.Time) NodeGroupScalingSafety { - isHealthy := csr.IsNodeGroupHealthy(nodeGroup.Id()) + isHealthy := csr.IsNodeGroupHealthy(nodeGroup) backoffStatus := csr.backoff.BackoffStatus(nodeGroup, csr.nodeInfosForGroups[nodeGroup.Id()], now) return NodeGroupScalingSafety{SafeToScale: isHealthy && !backoffStatus.IsBackedOff, Healthy: isHealthy, BackoffStatus: backoffStatus} } @@ -593,6 +606,8 @@ type Readiness struct { LongUnregistered []string // Names of nodes that haven't yet registered. Unregistered []string + // Number of hibernating nodes that exist in K8s + Hibernating []string // Time when the readiness was measured. Time time.Time // Names of nodes that are Unready due to missing resources. @@ -605,12 +620,15 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { perNodeGroup := make(map[string]Readiness) total := Readiness{Time: currentTime} - update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness { + update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness, nodeGroup cloudprovider.NodeGroup) Readiness { current.Registered = append(current.Registered, node.Name) if _, isDeleted := csr.deletedNodes[node.Name]; isDeleted { current.Deleted = append(current.Deleted, node.Name) } else if nr.Ready { current.Ready = append(current.Ready, node.Name) + } else if cloudprovider.IsNodeGroupHibernateEnabled(nodeGroup) && taints.HasShutdownTaint(node) || taints.HasUnreachableTaint(node) { + current.Hibernating = append(current.Hibernating, node.Name) + current.Unready = append(current.Unready, node.Name) } else if node.CreationTimestamp.Time.Add(MaxNodeStartupTime).After(currentTime) { current.NotStarted = append(current.NotStarted, node.Name) } else { @@ -635,9 +653,9 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { klog.Warningf("Failed to get readiness info for %s: %v", node.Name, errReady) } } else { - perNodeGroup[nodeGroup.Id()] = update(perNodeGroup[nodeGroup.Id()], node, nr) + perNodeGroup[nodeGroup.Id()] = update(perNodeGroup[nodeGroup.Id()], node, nr, nodeGroup) } - total = update(total, node, nr) + total = update(total, node, nr, nodeGroup) } for _, unregistered := range csr.unregisteredNodes { @@ -786,7 +804,7 @@ func (csr *ClusterStateRegistry) GetStatus(now time.Time) *api.ClusterAutoscaler // Health. nodeGroupStatus.Health = buildHealthStatusNodeGroup( - csr.IsNodeGroupHealthy(nodeGroup.Id()), readiness, acceptable, nodeGroup.MinSize(), nodeGroup.MaxSize(), nodeGroupLastStatus.Health) + csr.IsNodeGroupHealthy(nodeGroup), readiness, acceptable, nodeGroup.MinSize(), nodeGroup.MaxSize(), nodeGroupLastStatus.Health) // Scale up. nodeGroupStatus.ScaleUp = csr.buildScaleUpStatusNodeGroup( @@ -987,6 +1005,12 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]i ar := csr.acceptableRanges[id] // newNodes is the number of nodes that newNodes := ar.CurrentTarget - (len(readiness.Ready) + len(readiness.Unready) + len(readiness.LongUnregistered)) + if cloudprovider.IsNodeGroupHibernateEnabled(nodeGroup) { + newNodes -= len(readiness.Hibernating) + } + klog.V(3).Infof("newNodes: %d, currentTarget: %d, hibernating: %d, readinessReady: %d, readinessUnready: %d, "+ + "readiness.LongUnregistered: %d for nodeGroup %s", newNodes, ar.CurrentTarget, len(readiness.Hibernating), len(readiness.Ready), + len(readiness.Unready), len(readiness.LongUnregistered), id) if newNodes <= 0 { // Negative value is unlikely but theoretically possible. continue diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index 52f2952d6807..1815ab238986 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -118,7 +118,7 @@ func TestEmptyOK(t *testing.T) { assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.Empty(t, clusterstate.GetScaleUpFailures()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) assert.False(t, clusterstate.IsNodeGroupScalingUp("ng1")) assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1")) @@ -130,7 +130,7 @@ func TestEmptyOK(t *testing.T) { assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) assert.True(t, clusterstate.IsNodeGroupScalingUp("ng1")) assert.True(t, clusterstate.HasNodeGroupStartedScaleUp("ng1")) } @@ -201,7 +201,7 @@ func TestOKOneUnreadyNode(t *testing.T) { assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.Empty(t, clusterstate.GetScaleUpFailures()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) status := clusterstate.GetStatus(now) assert.Equal(t, api.ClusterAutoscalerHealthy, status.ClusterWide.Health.Status) @@ -265,7 +265,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) { assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.Empty(t, clusterstate.GetScaleUpFailures()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) status := clusterstate.GetStatus(now) assert.Equal(t, api.ClusterAutoscalerHealthy, status.ClusterWide.Health.Status) @@ -317,7 +317,7 @@ func TestMissingNodes(t *testing.T) { assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) assert.Empty(t, clusterstate.GetScaleUpFailures()) - assert.False(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.False(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) status := clusterstate.GetStatus(now) assert.Equal(t, api.ClusterAutoscalerHealthy, status.ClusterWide.Health.Status) @@ -357,7 +357,7 @@ func TestTooManyUnready(t *testing.T) { assert.NoError(t, err) assert.False(t, clusterstate.IsClusterHealthy()) assert.Empty(t, clusterstate.GetScaleUpFailures()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) } func TestUnreadyLongAfterCreation(t *testing.T) { @@ -455,7 +455,7 @@ func TestExpiredScaleUp(t *testing.T) { err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) - assert.False(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.False(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) assert.Equal(t, clusterstate.GetScaleUpFailures(), map[string][]ScaleUpFailure{ "ng1": { {NodeGroup: provider.GetNodeGroup("ng1"), Time: now, Reason: metrics.Timeout}, @@ -793,7 +793,7 @@ func TestScaleUpBackoff(t *testing.T) { err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) assert.Equal(t, NodeGroupScalingSafety{ SafeToScale: false, Healthy: true, @@ -817,7 +817,7 @@ func TestScaleUpBackoff(t *testing.T) { // Backoff should expire after timeout now = now.Add(5 * time.Minute /*InitialNodeGroupBackoffDuration*/).Add(time.Second) assert.True(t, clusterstate.IsClusterHealthy()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) assert.Equal(t, NodeGroupScalingSafety{SafeToScale: true, Healthy: true}, clusterstate.NodeGroupScaleUpSafety(ng1, now)) // Another failed scale up should cause longer backoff @@ -826,7 +826,7 @@ func TestScaleUpBackoff(t *testing.T) { err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) assert.Equal(t, NodeGroupScalingSafety{ SafeToScale: false, Healthy: true, @@ -862,7 +862,7 @@ func TestScaleUpBackoff(t *testing.T) { err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3, ng1_4}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) - assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) + assert.True(t, clusterstate.IsNodeGroupHealthy(provider.GetNodeGroup("ng1"))) assert.Equal(t, NodeGroupScalingSafety{SafeToScale: true, Healthy: true}, clusterstate.NodeGroupScaleUpSafety(ng1, now)) assert.Equal(t, backoff.Status{IsBackedOff: false}, clusterstate.backoff.BackoffStatus(ng1, nil, now)) } diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index b335c05dcb28..57ed1a8a7883 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -47,6 +47,8 @@ type NodeGroupAutoscalingOptions struct { ScaleDownUnneededTime time.Duration // ScaleDownUnreadyTime represents how long an unready node should be unneeded before it is eligible for scale down ScaleDownUnreadyTime time.Duration + // ScaleDownHibernateEnabled sets whether or not nodes will be "stopped" rather than deleted during scale down + ScaleDownHibernateEnabled bool // Maximum time CA waits for node to be provisioned MaxNodeProvisionTime time.Duration // ZeroOrMaxNodeScaling means that a node group should be scaled up to maximum size or down to zero nodes all at once instead of one-by-one. diff --git a/cluster-autoscaler/core/scaledown/eligibility/eligibility.go b/cluster-autoscaler/core/scaledown/eligibility/eligibility.go index f6cf9a9e55c9..de1e2cdad6d7 100644 --- a/cluster-autoscaler/core/scaledown/eligibility/eligibility.go +++ b/cluster-autoscaler/core/scaledown/eligibility/eligibility.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/klogx" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" apiv1 "k8s.io/api/core/v1" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -80,6 +81,21 @@ func (c *Checker) FilterOutUnremovable(context *context.AutoscalingContext, scal continue } + nodeGroup, err := context.CloudProvider.NodeGroupForNode(node) + if err != nil { + klog.Errorf("Error while checking node group for %s: %v", node.Name, err) + continue + } + if cloudprovider.IsNodeGroupHibernateEnabled(nodeGroup) { + nr, err := kube_util.GetNodeReadiness(node) + if err != nil { + klog.Errorf("Unable to determine node %s readiness, err: %v", node.Name, err) + } + if taints.HasShutdownTaint(node) && !nr.Ready { + continue + } + } + // Skip nodes that were recently checked. if unremovableNodes.IsRecent(node.Name) { ineligible = append(ineligible, &simulator.UnremovableNode{Node: node, Reason: simulator.RecentlyUnremovable}) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 471a341077f2..59a94751f35e 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -425,6 +425,26 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } } + for _, nodeGroup := range a.AutoscalingContext.CloudProvider.NodeGroups() { + if !a.clusterStateRegistry.IsNodeGroupAtTargetSize(nodeGroup.Id()) { + for _, node := range allNodes { + nr, err := kube_util.GetNodeReadiness(node) + if err != nil { + klog.Errorf("Unable to determine node %s readiness, err: %v", node.Name, err) + } + if !nr.Ready { + if !(taints.HasShutdownTaint(node) || taints.HasUnreachableTaint(node)) || !taints.HasToBeDeletedTaint(node) { + continue + } + klog.Infof("Node %s is not Ready, will remove ToBeDeletedByClusterAutoscaler taint", node.Name) + if _, err := taints.CleanToBeDeleted(node, a.ClientSet, a.AutoscalingContext.CordonNodeBeforeTerminate); err != nil { + klog.Errorf("error while removing taint from node %s: %s", node.Name, err.Error()) + } + } + } + } + } + if !a.clusterStateRegistry.IsClusterHealthy() { klog.Warning("Cluster is not ready for autoscaling") a.scaleDownPlanner.CleanUpUnneededNodes() diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d0480eb1f99a..f23a80754eb3 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -100,19 +100,20 @@ func multiStringFlag(name string, usage string) *MultiStringFlag { } var ( - clusterName = flag.String("cluster-name", "", "Autoscaled cluster name, if available") - address = flag.String("address", ":8085", "The address to expose prometheus metrics.") - kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default") - kubeConfigFile = flag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") - kubeAPIContentType = flag.String("kube-api-content-type", "application/vnd.kubernetes.protobuf", "Content type of requests sent to apiserver.") - kubeClientBurst = flag.Int("kube-client-burst", rest.DefaultBurst, "Burst value for kubernetes client.") - kubeClientQPS = flag.Float64("kube-client-qps", float64(rest.DefaultQPS), "QPS value for kubernetes client.") - cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") - namespace = flag.String("namespace", "kube-system", "Namespace in which cluster-autoscaler run.") - enforceNodeGroupMinSize = flag.Bool("enforce-node-group-min-size", false, "Should CA scale up the node group to the configured min size if needed.") - scaleDownEnabled = flag.Bool("scale-down-enabled", true, "Should CA scale down the cluster") - scaleDownUnreadyEnabled = flag.Bool("scale-down-unready-enabled", true, "Should CA scale down unready nodes of the cluster") - scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute, + clusterName = flag.String("cluster-name", "", "Autoscaled cluster name, if available") + address = flag.String("address", ":8085", "The address to expose prometheus metrics.") + kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default") + kubeConfigFile = flag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") + kubeAPIContentType = flag.String("kube-api-content-type", "application/vnd.kubernetes.protobuf", "Content type of requests sent to apiserver.") + kubeClientBurst = flag.Int("kube-client-burst", rest.DefaultBurst, "Burst value for kubernetes client.") + kubeClientQPS = flag.Float64("kube-client-qps", float64(rest.DefaultQPS), "QPS value for kubernetes client.") + cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") + namespace = flag.String("namespace", "kube-system", "Namespace in which cluster-autoscaler run.") + enforceNodeGroupMinSize = flag.Bool("enforce-node-group-min-size", false, "Should CA scale up the node group to the configured min size if needed.") + scaleDownEnabled = flag.Bool("scale-down-enabled", true, "Should CA scale down the cluster") + scaleDownUnreadyEnabled = flag.Bool("scale-down-unready-enabled", true, "Should CA scale down unready nodes of the cluster") + scaleDownHibernateEnabled = flag.Bool("scale-down-hibernate-enabled", false, "Should CA hibernate nodes during scale down") + scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute, "How long after scale up that scale down evaluation resumes") scaleDownDelayTypeLocal = flag.Bool("scale-down-delay-type-local", false, "Should --scale-down-delay-after-* flags be applied locally per nodegroup or globally across all nodegroups") @@ -327,12 +328,19 @@ func createAutoscalingOptions() config.AutoscalingOptions { } } + // It doesn't make sense to enable node hiberation and scale down of NotReady nodes + // because scaled down nodes will enter the NotReady state during hibernation scale down + if *scaleDownHibernateEnabled && *scaleDownUnreadyEnabled { + klog.Fatalf("Invalid configuration, when --scale-down-hibernate-enabled is true, --scale-down-unready-enabled should be false") + } + return config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ ScaleDownUtilizationThreshold: *scaleDownUtilizationThreshold, ScaleDownGpuUtilizationThreshold: *scaleDownGpuUtilizationThreshold, ScaleDownUnneededTime: *scaleDownUnneededTime, ScaleDownUnreadyTime: *scaleDownUnreadyTime, + ScaleDownHibernateEnabled: *scaleDownHibernateEnabled, IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization, MaxNodeProvisionTime: *maxNodeProvisionTime, }, diff --git a/cluster-autoscaler/processors/nodes/pre_filtering_processor.go b/cluster-autoscaler/processors/nodes/pre_filtering_processor.go index d8428426ccce..6ef42a46029c 100644 --- a/cluster-autoscaler/processors/nodes/pre_filtering_processor.go +++ b/cluster-autoscaler/processors/nodes/pre_filtering_processor.go @@ -20,6 +20,7 @@ import ( "reflect" apiv1 "k8s.io/api/core/v1" + //kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" klog "k8s.io/klog/v2" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -57,6 +58,14 @@ func (n *PreFilteringScaleDownNodeProcessor) GetScaleDownCandidates(ctx *context klog.V(4).Infof("Node %s should not be processed by cluster autoscaler (no node group config)", node.Name) continue } + /*nr, err := kube_util.GetNodeReadiness(node) + if err != nil { + klog.Errorf("Unable to determine node %s readiness, err: %v", node.Name, err) + } + if !nr.Ready { + klog.Infof("Node %s should not be processed by cluster autoscaler (NotReady)", node.Name) + continue + }*/ size, found := nodeGroupSize[nodeGroup.Id()] if !found { klog.Errorf("Error while checking node group size %s: group size not found", nodeGroup.Id()) diff --git a/cluster-autoscaler/processors/status/metrics_autoscaling_status_processor.go b/cluster-autoscaler/processors/status/metrics_autoscaling_status_processor.go index 816dccd3ec42..b824d560eef1 100644 --- a/cluster-autoscaler/processors/status/metrics_autoscaling_status_processor.go +++ b/cluster-autoscaler/processors/status/metrics_autoscaling_status_processor.go @@ -44,7 +44,7 @@ func (p *MetricsAutoscalingStatusProcessor) Process(context *context.Autoscaling if !nodeGroup.Exist() { continue } - metrics.UpdateNodeGroupHealthStatus(nodeGroup.Id(), csr.IsNodeGroupHealthy(nodeGroup.Id())) + metrics.UpdateNodeGroupHealthStatus(nodeGroup.Id(), csr.IsNodeGroupHealthy(nodeGroup)) backoffStatus := csr.BackoffStatusForNodeGroup(nodeGroup, now) p.updateNodeGroupBackoffStatusMetrics(nodeGroup.Id(), backoffStatus) } diff --git a/cluster-autoscaler/utils/taints/taints.go b/cluster-autoscaler/utils/taints/taints.go index 267a4a62872b..05b582a16de1 100644 --- a/cluster-autoscaler/utils/taints/taints.go +++ b/cluster-autoscaler/utils/taints/taints.go @@ -249,6 +249,16 @@ func HasDeletionCandidateTaint(node *apiv1.Node) bool { return HasTaint(node, DeletionCandidateTaint) } +// HasShutdownTaint returns true if cloudprovider node shutdown taint is applied on the node. +func HasShutdownTaint(node *apiv1.Node) bool { + return HasTaint(node, cloudproviderapi.TaintNodeShutdown) +} + +// HasUnreachableTaint returns true if unreachable taint is applied on the node. +func HasUnreachableTaint(node *apiv1.Node) bool { + return HasTaint(node, apiv1.TaintNodeUnreachable) +} + // HasTaint returns true if the specified taint is applied on the node. func HasTaint(node *apiv1.Node, taintKey string) bool { for _, taint := range node.Spec.Taints {