From 43cffafa3bf656dc5366369a498e628bc1be50bb Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Thu, 19 Dec 2024 14:59:24 +0530 Subject: [PATCH 1/7] [WIP] fix CA marking machines for deletion --- .../cloudprovider/mcm/mcm_manager.go | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 399862c9e383..2b217f8914a1 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -27,6 +27,7 @@ import ( "errors" "flag" "fmt" + "k8s.io/apimachinery/pkg/util/sets" v1appslister "k8s.io/client-go/listers/apps/v1" "k8s.io/utils/pointer" "maps" @@ -98,6 +99,9 @@ const ( machineDeploymentPausedReason = "DeploymentPaused" // machineDeploymentNameLabel key for Machine Deployment name in machine labels machineDeploymentNameLabel = "name" + // machinesMarkedByCAForDeletion is the annotation set by CA on machine deployment. Its value denotes the machines that + // CA marked for deletion by updating the priority annotation to 1 and scaling down the machine deployment. + machinesMarkedByCAForDeletion = "cluster-autoscaler.kubernetes.io/machines-marked-by-ca-for-deletion" ) var ( @@ -424,7 +428,7 @@ func (m *McmManager) Refresh() error { klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name) continue } - replicas := machineDeployment.Spec.Replicas + markedMachines := sets.New(strings.Split(machineDeployment.Annotations[machinesMarkedByCAForDeletion], ",")...) // check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed. machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name) if err != nil { @@ -432,27 +436,17 @@ func (m *McmManager) Refresh() error { collectiveError = errors.Join(collectiveError, err) continue } - var machinesMarkedForDeletion []*v1alpha1.Machine + var incorrectlyMarkedMachines []*Ref for _, machine := range machines { // no need to reset priority for machines already in termination or failed phase if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { continue } - if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines { - machinesMarkedForDeletion = append(machinesMarkedForDeletion, machine) + if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) { + incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace}) } } - if int(replicas) > len(machines)-len(machinesMarkedForDeletion) { - slices.SortStableFunc(machinesMarkedForDeletion, func(m1, m2 *v1alpha1.Machine) int { - return -m1.CreationTimestamp.Compare(m2.CreationTimestamp.Time) - }) - diff := int(replicas) - len(machines) + len(machinesMarkedForDeletion) - targetRefs := make([]*Ref, 0, diff) - for i := 0; i < min(diff, len(machinesMarkedForDeletion)); i++ { - targetRefs = append(targetRefs, &Ref{Name: machinesMarkedForDeletion[i].Name, Namespace: machinesMarkedForDeletion[i].Namespace}) - } - collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(targetRefs)) - } + collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(incorrectlyMarkedMachines)) } return collectiveError } @@ -508,18 +502,29 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { if !isRollingUpdateFinished(md) { return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) } + markedMachines := sets.New(strings.Split(md.Annotations[machinesMarkedByCAForDeletion], ",")...) + var filteredTargetMachineRefs []*Ref + for _, targetMachineRef := range targetMachineRefs { + if !markedMachines.Has(targetMachineRef.Name) { + filteredTargetMachineRefs = append(filteredTargetMachineRefs, targetMachineRef) + markedMachines.Insert(targetMachineRef.Name) + } else { + klog.Infof("Machine %s is already marked for deletion, skipping", targetMachineRef.Name) + } + } + // update priorities of machines to be deleted except the ones already in termination to 1 - scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs) + err = m.prioritizeMachinesForDeletion(filteredTargetMachineRefs) if err != nil { return err } // Trying to update the machineDeployment till the deadline err = m.retry(func(ctx context.Context) (bool, error) { - return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, scaleDownAmount) + return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, len(filteredTargetMachineRefs), strings.Join(markedMachines.UnsortedList(), ",")) }, "MachineDeployment", "update", commonMachineDeployment.Name) if err != nil { klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err) - return errors.Join(err, m.resetPriorityForMachines(targetMachineRefs)) + return errors.Join(err, m.resetPriorityForMachines(filteredTargetMachineRefs)) } return nil } @@ -552,7 +557,7 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { } // prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1 -func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) (int, error) { +func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) error { var expectedToTerminateMachineNodePairs = make(map[string]string) for _, machineRef := range targetMachineRefs { // Trying to update the priority of machineRef till m.maxRetryTimeout @@ -573,11 +578,11 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) (in return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines) }, "Machine", "update", machineRef.Name); err != nil { klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) - return 0, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) + return fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) } } klog.V(2).Infof("Expected to remove following {machineRef: corresponding node} pairs %s", expectedToTerminateMachineNodePairs) - return len(expectedToTerminateMachineNodePairs), nil + return nil } // updateAnnotationOnMachine returns error only when updating the annotations on machine has been failing consequently and deadline is crossed @@ -610,7 +615,7 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin } // scaleDownMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down. -func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int) (bool, error) { +func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) { md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) if err != nil { klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", mdName, err) @@ -626,6 +631,10 @@ func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName stri return false, fmt.Errorf("cannot delete machines in machine deployment %s, expected decrease in replicas %d is more than current replicas %d", mdName, scaleDownAmount, mdclone.Spec.Replicas) } mdclone.Spec.Replicas = expectedReplicas + if mdclone.Annotations == nil { + mdclone.Annotations = make(map[string]string) + } + mdclone.Annotations[machinesMarkedByCAForDeletion] = markedMachines _, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(ctx, mdclone, metav1.UpdateOptions{}) if err != nil { return true, fmt.Errorf("unable to scale in machine deployment %s, Error: %w", mdName, err) From 0d939b02ffd303b8b57e06af6c86e5944395dcab Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Thu, 19 Dec 2024 16:00:40 +0530 Subject: [PATCH 2/7] [WIP] add mutex for machine deployment --- .../cloudprovider/mcm/mcm_cloud_provider.go | 55 +++++---------- .../cloudprovider/mcm/mcm_manager.go | 70 +++++++++++-------- 2 files changed, 56 insertions(+), 69 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index e6ff41de6279..41ac61010622 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -26,6 +26,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" apiv1 "k8s.io/api/core/v1" @@ -67,15 +68,14 @@ const ( // MCMCloudProvider implements the cloud provider interface for machine-controller-manager // Reference: https://github.com/gardener/machine-controller-manager type mcmCloudProvider struct { - mcmManager *McmManager - machinedeployments map[types.NamespacedName]*MachineDeployment - resourceLimiter *cloudprovider.ResourceLimiter + mcmManager *McmManager + resourceLimiter *cloudprovider.ResourceLimiter } // BuildMcmCloudProvider builds CloudProvider implementation for machine-controller-manager. func BuildMcmCloudProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) { if mcmManager.discoveryOpts.StaticDiscoverySpecified() { - return buildStaticallyDiscoveringProvider(mcmManager, mcmManager.discoveryOpts.NodeGroupSpecs, resourceLimiter) + return buildStaticallyDiscoveringProvider(mcmManager, resourceLimiter) } return nil, fmt.Errorf("Failed to build an mcm cloud provider: Either node group specs or node group auto discovery spec must be specified") } @@ -96,16 +96,10 @@ func BuildMCM(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover return provider } -func buildStaticallyDiscoveringProvider(mcmManager *McmManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) { +func buildStaticallyDiscoveringProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) { mcm := &mcmCloudProvider{ - mcmManager: mcmManager, - machinedeployments: make(map[types.NamespacedName]*MachineDeployment), - resourceLimiter: resourceLimiter, - } - for _, spec := range specs { - if err := mcm.addNodeGroup(spec); err != nil { - return nil, err - } + mcmManager: mcmManager, + resourceLimiter: resourceLimiter, } return mcm, nil } @@ -116,31 +110,14 @@ func (mcm *mcmCloudProvider) Cleanup() error { return nil } -// addNodeGroup adds node group defined in string spec. Format: -// minNodes:maxNodes:namespace.machineDeploymentName -func (mcm *mcmCloudProvider) addNodeGroup(spec string) error { - machinedeployment, err := buildMachineDeploymentFromSpec(spec, mcm.mcmManager) - if err != nil { - return err - } - mcm.addMachineDeployment(machinedeployment) - return nil -} - -func (mcm *mcmCloudProvider) addMachineDeployment(machinedeployment *MachineDeployment) { - key := types.NamespacedName{Namespace: machinedeployment.Namespace, Name: machinedeployment.Name} - mcm.machinedeployments[key] = machinedeployment - return -} - func (mcm *mcmCloudProvider) Name() string { return "machine-controller-manager" } // NodeGroups returns all node groups configured for this cloud provider. func (mcm *mcmCloudProvider) NodeGroups() []cloudprovider.NodeGroup { - result := make([]cloudprovider.NodeGroup, 0, len(mcm.machinedeployments)) - for _, machinedeployment := range mcm.machinedeployments { + result := make([]cloudprovider.NodeGroup, 0, len(mcm.mcmManager.machineDeployments)) + for _, machinedeployment := range mcm.mcmManager.machineDeployments { if machinedeployment.maxSize == 0 { continue } @@ -172,7 +149,7 @@ func (mcm *mcmCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.N } key := types.NamespacedName{Namespace: md.Namespace, Name: md.Name} - _, isManaged := mcm.machinedeployments[key] + _, isManaged := mcm.mcmManager.machineDeployments[key] if !isManaged { klog.V(4).Infof("Skipped node %v, it's not managed by this controller", node.Spec.ProviderID) return nil, nil @@ -293,8 +270,9 @@ type MachineDeployment struct { mcmManager *McmManager - minSize int - maxSize int + scalingMutex sync.Mutex + minSize int + maxSize int } // MaxSize returns maximum size of the node group. @@ -541,9 +519,10 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment { return &MachineDeployment{ - mcmManager: mcmManager, - minSize: minSize, - maxSize: maxSize, + mcmManager: mcmManager, + minSize: minSize, + maxSize: maxSize, + scalingMutex: sync.Mutex{}, Ref: Ref{ Name: name, Namespace: namespace, diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 2b217f8914a1..c9fcd1c1d30d 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -27,6 +27,7 @@ import ( "errors" "flag" "fmt" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" v1appslister "k8s.io/client-go/listers/apps/v1" "k8s.io/utils/pointer" @@ -57,7 +58,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/client-go/discovery" @@ -128,6 +128,7 @@ type McmManager struct { namespace string interrupt chan struct{} discoveryOpts cloudprovider.NodeGroupDiscoveryOptions + machineDeployments map[types.NamespacedName]*MachineDeployment deploymentLister v1appslister.DeploymentLister machineClient machineapi.MachineV1alpha1Interface machineDeploymentLister machinelisters.MachineDeploymentLister @@ -264,7 +265,11 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti maxRetryTimeout: maxRetryTimeout, retryInterval: retryInterval, } - + for _, spec := range discoveryOpts.NodeGroupSpecs { + if err := m.addNodeGroup(spec); err != nil { + return nil, err + } + } targetCoreInformerFactory.Start(m.interrupt) controlMachineInformerFactory.Start(m.interrupt) appsInformerFactory.Start(m.interrupt) @@ -287,6 +292,23 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti return nil, fmt.Errorf("Unable to start cloud provider MCM for cluster autoscaler: API GroupVersion %q or %q or %q is not available; \nFound: %#v", machineGVR, machineSetGVR, machineDeploymentGVR, availableResources) } +// addNodeGroup adds node group defined in string spec. Format: +// minNodes:maxNodes:namespace.machineDeploymentName +func (m *McmManager) addNodeGroup(spec string) error { + machineDeployment, err := buildMachineDeploymentFromSpec(spec, m) + if err != nil { + return err + } + m.addMachineDeployment(machineDeployment) + return nil +} + +func (m *McmManager) addMachineDeployment(machineDeployment *MachineDeployment) { + key := types.NamespacedName{Namespace: machineDeployment.Namespace, Name: machineDeployment.Name} + m.machineDeployments[key] = machineDeployment + return +} + // TODO: In general, any controller checking this needs to be dynamic so // users don't have to restart their controller manager if they change the apiserver. // Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext. @@ -381,36 +403,11 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo return nil, fmt.Errorf("unable to find parent MachineDeployment of given MachineSet object %s %v", machineSetName, err) } - mcmRef := Ref{ - Name: machineDeploymentName, - Namespace: m.namespace, + machineDeployment, ok := m.machineDeployments[types.NamespacedName{Namespace: m.namespace, Name: machineDeploymentName}] + if !ok { + return nil, fmt.Errorf("machineDeployment %s not found in the list of machine deployments", machineDeploymentName) } - - discoveryOpts := m.discoveryOpts - specs := discoveryOpts.NodeGroupSpecs - var min, max int - for _, spec := range specs { - s, err := dynamic.SpecFromString(spec, true) - if err != nil { - return nil, fmt.Errorf("Error occurred while parsing the spec") - } - - str := strings.Split(s.Name, ".") - _, Name := str[0], str[1] - - if Name == machineDeploymentName { - min = s.MinSize - max = s.MaxSize - break - } - } - - return &MachineDeployment{ - mcmRef, - m, - min, - max, - }, nil + return machineDeployment, nil } // Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired. @@ -428,12 +425,19 @@ func (m *McmManager) Refresh() error { klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name) continue } + mcd, ok := m.machineDeployments[types.NamespacedName{Namespace: m.namespace, Name: machineDeployment.Name}] + if !ok { + klog.Errorf("[Refresh] machine deployment %s not found in the list of machine deployments", machineDeployment.Name) + continue + } + mcd.scalingMutex.Lock() markedMachines := sets.New(strings.Split(machineDeployment.Annotations[machinesMarkedByCAForDeletion], ",")...) // check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed. machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name) if err != nil { klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) collectiveError = errors.Join(collectiveError, err) + mcd.scalingMutex.Unlock() continue } var incorrectlyMarkedMachines []*Ref @@ -447,6 +451,7 @@ func (m *McmManager) Refresh() error { } } collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(incorrectlyMarkedMachines)) + mcd.scalingMutex.Unlock() } return collectiveError } @@ -493,6 +498,9 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { if err != nil { return err } + // acquire the mutex + commonMachineDeployment.scalingMutex.Lock() + defer commonMachineDeployment.scalingMutex.Unlock() // get the machine deployment and return if rolling update is not finished md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name) if err != nil { From 98f20d36c819383109b0bb0a42f517310d3ca68c Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Fri, 20 Dec 2024 12:45:51 +0530 Subject: [PATCH 3/7] initialise machinedeployment map in mcmManager --- cluster-autoscaler/cloudprovider/mcm/mcm_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index c9fcd1c1d30d..23ee0b5c642d 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -254,6 +254,7 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti m := &McmManager{ namespace: namespace, interrupt: make(chan struct{}), + machineDeployments: make(map[types.NamespacedName]*MachineDeployment), deploymentLister: deploymentLister, machineClient: controlMachineClient, machineClassLister: machineClassLister, From 56d80aca4881c5240d54112a137d216bba7e1ef6 Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Fri, 20 Dec 2024 16:36:07 +0530 Subject: [PATCH 4/7] add Refresh method in nodegrp implementation --- .../cloudprovider/mcm/mcm_cloud_provider.go | 56 ++++++++++++ .../cloudprovider/mcm/mcm_manager.go | 89 +++++-------------- 2 files changed, 80 insertions(+), 65 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index 41ac61010622..3d3febe2c953 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -24,6 +24,10 @@ package mcm import ( "context" "fmt" + "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "slices" "strconv" "strings" "sync" @@ -321,6 +325,8 @@ func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error { if delta <= 0 { return fmt.Errorf("size increase must be positive") } + machinedeployment.scalingMutex.Lock() + defer machinedeployment.scalingMutex.Unlock() size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) if err != nil { return err @@ -344,6 +350,8 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error if delta >= 0 { return fmt.Errorf("size decrease size must be negative") } + machinedeployment.scalingMutex.Lock() + defer machinedeployment.scalingMutex.Unlock() size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) if err != nil { return err @@ -358,6 +366,54 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error }, "MachineDeployment", "update", machinedeployment.Name) } +// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment +func (machineDeployment *MachineDeployment) Refresh() error { + machineDeployment.scalingMutex.Lock() + defer machineDeployment.scalingMutex.Unlock() + mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name) + if err != nil { + return fmt.Errorf("failed to get machine deployment %s: %v", machineDeployment.Name, err) + } + // ignore the machine deployment if it is in rolling update + if !isRollingUpdateFinished(mcd) { + klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name) + return nil + } + markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...) + machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name) + if err != nil { + klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) + return err + } + var incorrectlyMarkedMachines []*Ref + for _, machine := range machines { + // no need to reset priority for machines already in termination or failed phase + if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { + continue + } + if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) { + incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace}) + } + } + var updatedMarkedMachines []string + for machineName := range markedMachines { + if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool { + return mc.Name == machineName + }) { + updatedMarkedMachines = append(updatedMarkedMachines, machineName) + } + } + clone := mcd.DeepCopy() + clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",") + ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout) + defer cancelFn() + _, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return err + } + return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines) +} + // Belongs returns true if the given node belongs to the NodeGroup. // TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list. func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) { diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 23ee0b5c642d..6ce7b5b70a3c 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -414,45 +414,9 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo // Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired. // It will select the machines to reset the priority based on the descending order of creation timestamp. func (m *McmManager) Refresh() error { - machineDeployments, err := m.machineDeploymentLister.MachineDeployments(m.namespace).List(labels.Everything()) - if err != nil { - klog.Errorf("[Refresh] unable to list machine deployments") - return err - } var collectiveError error - for _, machineDeployment := range machineDeployments { - // ignore the machine deployment if it is in rolling update - if !isRollingUpdateFinished(machineDeployment) { - klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name) - continue - } - mcd, ok := m.machineDeployments[types.NamespacedName{Namespace: m.namespace, Name: machineDeployment.Name}] - if !ok { - klog.Errorf("[Refresh] machine deployment %s not found in the list of machine deployments", machineDeployment.Name) - continue - } - mcd.scalingMutex.Lock() - markedMachines := sets.New(strings.Split(machineDeployment.Annotations[machinesMarkedByCAForDeletion], ",")...) - // check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed. - machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name) - if err != nil { - klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) - collectiveError = errors.Join(collectiveError, err) - mcd.scalingMutex.Unlock() - continue - } - var incorrectlyMarkedMachines []*Ref - for _, machine := range machines { - // no need to reset priority for machines already in termination or failed phase - if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { - continue - } - if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) { - incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace}) - } - } - collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(incorrectlyMarkedMachines)) - mcd.scalingMutex.Unlock() + for _, machineDeployment := range m.machineDeployments { + collectiveError = errors.Join(collectiveError, machineDeployment.Refresh()) } return collectiveError } @@ -512,28 +476,18 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) } markedMachines := sets.New(strings.Split(md.Annotations[machinesMarkedByCAForDeletion], ",")...) - var filteredTargetMachineRefs []*Ref - for _, targetMachineRef := range targetMachineRefs { - if !markedMachines.Has(targetMachineRef.Name) { - filteredTargetMachineRefs = append(filteredTargetMachineRefs, targetMachineRef) - markedMachines.Insert(targetMachineRef.Name) - } else { - klog.Infof("Machine %s is already marked for deletion, skipping", targetMachineRef.Name) - } - } - // update priorities of machines to be deleted except the ones already in termination to 1 - err = m.prioritizeMachinesForDeletion(filteredTargetMachineRefs) + machinesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs) if err != nil { return err } + markedMachines.Insert(machinesWithPrio1...) // Trying to update the machineDeployment till the deadline err = m.retry(func(ctx context.Context) (bool, error) { - return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, len(filteredTargetMachineRefs), strings.Join(markedMachines.UnsortedList(), ",")) + return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machinesWithPrio1), strings.Join(markedMachines.UnsortedList(), ",")) }, "MachineDeployment", "update", commonMachineDeployment.Name) if err != nil { - klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err) - return errors.Join(err, m.resetPriorityForMachines(filteredTargetMachineRefs)) + klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err) } return nil } @@ -543,6 +497,10 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { var collectiveError error for _, mcRef := range mcRefs { machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name) + if kube_errors.IsNotFound(err) { + klog.Warningf("Machine %s not found, skipping resetting priority annotation", mcRef.Name) + continue + } if err != nil { collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err)) continue @@ -566,8 +524,9 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { } // prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1 -func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) error { +func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) ([]string, error) { var expectedToTerminateMachineNodePairs = make(map[string]string) + var machinesMarkedWithPrio1 []string for _, machineRef := range targetMachineRefs { // Trying to update the priority of machineRef till m.maxRetryTimeout if err := m.retry(func(ctx context.Context) (bool, error) { @@ -583,15 +542,20 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) err if isMachineFailedOrTerminating(mc) { return false, nil } + if mc.Annotations[machinePriorityAnnotation] == priorityValueForCandidateMachines { + klog.Infof("Machine %q priority is already set to 1, hence skipping the update", mc.Name) + return false, nil + } + machinesMarkedWithPrio1 = append(machinesMarkedWithPrio1, machineRef.Name) expectedToTerminateMachineNodePairs[mc.Name] = mc.Labels["node"] return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines) }, "Machine", "update", machineRef.Name); err != nil { klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) - return fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) + return nil, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) } } klog.V(2).Infof("Expected to remove following {machineRef: corresponding node} pairs %s", expectedToTerminateMachineNodePairs) - return nil + return machinesMarkedWithPrio1, nil } // updateAnnotationOnMachine returns error only when updating the annotations on machine has been failing consequently and deadline is crossed @@ -606,16 +570,10 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin return true, err } clone := machine.DeepCopy() - if clone.Annotations != nil { - if clone.Annotations[key] == val { - klog.Infof("Machine %q priority is already set to 1, hence skipping the update", machine.Name) - return false, nil - } - clone.Annotations[key] = val - } else { + if clone.Annotations == nil { clone.Annotations = make(map[string]string) - clone.Annotations[key] = val } + clone.Annotations[key] = val _, err = m.machineClient.Machines(machine.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) if err == nil { klog.Infof("Machine %s marked with priority %s successfully", mcName, val) @@ -623,8 +581,9 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin return true, err } -// scaleDownMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down. -func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) { +// scaleDownAndAnnotateMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down. +// It also updates the machines-marked-by-ca-for-deletion annotation on the machine deployment with the list of existing machines marked for deletion. +func (m *McmManager) scaleDownAndAnnotateMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) { md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) if err != nil { klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", mdName, err) From f3774f4be4b9027e8a98e2aa53a90fe34389344c Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Tue, 24 Dec 2024 17:53:33 +0530 Subject: [PATCH 5/7] address review comments --- .../cloudprovider/mcm/mcm_cloud_provider.go | 138 +++++++++--------- .../cloudprovider/mcm/mcm_manager.go | 11 +- 2 files changed, 73 insertions(+), 76 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index 3d3febe2c953..eff0e4bfaabf 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -229,14 +229,8 @@ func (mcm *mcmCloudProvider) GetNodeGpuConfig(*apiv1.Node) *cloudprovider.GpuCon return nil } -// Ref contains a reference to the name of the machine-deployment. -type Ref struct { - Name string - Namespace string -} - // ReferenceFromProviderID extracts the Ref from providerId. It returns corresponding machine-name to providerid. -func ReferenceFromProviderID(m *McmManager, id string) (*Ref, error) { +func ReferenceFromProviderID(m *McmManager, id string) (*types.NamespacedName, error) { machines, err := m.machineLister.Machines(m.namespace).List(labels.Everything()) if err != nil { return nil, fmt.Errorf("Could not list machines due to error: %s", err) @@ -262,7 +256,7 @@ func ReferenceFromProviderID(m *McmManager, id string) (*Ref, error) { klog.V(4).Infof("No machine found for node ID %q", id) return nil, nil } - return &Ref{ + return &types.NamespacedName{ Name: Name, Namespace: Namespace, }, nil @@ -270,7 +264,7 @@ func ReferenceFromProviderID(m *McmManager, id string) (*Ref, error) { // MachineDeployment implements NodeGroup interface. type MachineDeployment struct { - Ref + types.NamespacedName mcmManager *McmManager @@ -280,64 +274,64 @@ type MachineDeployment struct { } // MaxSize returns maximum size of the node group. -func (machinedeployment *MachineDeployment) MaxSize() int { - return machinedeployment.maxSize +func (machineDeployment *MachineDeployment) MaxSize() int { + return machineDeployment.maxSize } // MinSize returns minimum size of the node group. -func (machinedeployment *MachineDeployment) MinSize() int { - return machinedeployment.minSize +func (machineDeployment *MachineDeployment) MinSize() int { + return machineDeployment.minSize } // TargetSize returns the current TARGET size of the node group. It is possible that the // number is different from the number of nodes registered in Kubernetes. -func (machinedeployment *MachineDeployment) TargetSize() (int, error) { - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) +func (machineDeployment *MachineDeployment) TargetSize() (int, error) { + size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment) return int(size), err } // Exist checks if the node group really exists on the cloud provider side. Allows to tell the // theoretical node group from the real one. // TODO: Implement this to check if machine-deployment really exists. -func (machinedeployment *MachineDeployment) Exist() bool { +func (machineDeployment *MachineDeployment) Exist() bool { return true } // Create creates the node group on the cloud provider side. -func (machinedeployment *MachineDeployment) Create() (cloudprovider.NodeGroup, error) { +func (machineDeployment *MachineDeployment) Create() (cloudprovider.NodeGroup, error) { return nil, cloudprovider.ErrAlreadyExist } // Autoprovisioned returns true if the node group is autoprovisioned. -func (machinedeployment *MachineDeployment) Autoprovisioned() bool { +func (machineDeployment *MachineDeployment) Autoprovisioned() bool { return false } // Delete deletes the node group on the cloud provider side. // This will be executed only for autoprovisioned node groups, once their size drops to 0. -func (machinedeployment *MachineDeployment) Delete() error { +func (machineDeployment *MachineDeployment) Delete() error { return cloudprovider.ErrNotImplemented } // IncreaseSize of the Machinedeployment. -func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error { - klog.V(0).Infof("Received request to increase size of machine deployment %s by %d", machinedeployment.Name, delta) +func (machineDeployment *MachineDeployment) IncreaseSize(delta int) error { + klog.V(0).Infof("Received request to increase size of machine deployment %s by %d", machineDeployment.Name, delta) if delta <= 0 { return fmt.Errorf("size increase must be positive") } - machinedeployment.scalingMutex.Lock() - defer machinedeployment.scalingMutex.Unlock() - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) + machineDeployment.scalingMutex.Lock() + defer machineDeployment.scalingMutex.Unlock() + size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment) if err != nil { return err } targetSize := int(size) + delta - if targetSize > machinedeployment.MaxSize() { - return fmt.Errorf("size increase too large - desired:%d max:%d", targetSize, machinedeployment.MaxSize()) + if targetSize > machineDeployment.MaxSize() { + return fmt.Errorf("size increase too large - desired:%d max:%d", targetSize, machineDeployment.MaxSize()) } - return machinedeployment.mcmManager.retry(func(ctx context.Context) (bool, error) { - return machinedeployment.mcmManager.SetMachineDeploymentSize(ctx, machinedeployment, int64(targetSize)) - }, "MachineDeployment", "update", machinedeployment.Name) + return machineDeployment.mcmManager.retry(func(ctx context.Context) (bool, error) { + return machineDeployment.mcmManager.SetMachineDeploymentSize(ctx, machineDeployment, int64(targetSize)) + }, "MachineDeployment", "update", machineDeployment.Name) } // DecreaseTargetSize decreases the target size of the node group. This function @@ -345,25 +339,25 @@ func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error { // request for new nodes that have not been yet fulfilled. Delta should be negative. // It is assumed that cloud provider will not delete the existing nodes if the size // when there is an option to just decrease the target. -func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error { - klog.V(0).Infof("Received request to decrease target size of machine deployment %s by %d", machinedeployment.Name, delta) +func (machineDeployment *MachineDeployment) DecreaseTargetSize(delta int) error { + klog.V(0).Infof("Received request to decrease target size of machine deployment %s by %d", machineDeployment.Name, delta) if delta >= 0 { return fmt.Errorf("size decrease size must be negative") } - machinedeployment.scalingMutex.Lock() - defer machinedeployment.scalingMutex.Unlock() - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) + machineDeployment.scalingMutex.Lock() + defer machineDeployment.scalingMutex.Unlock() + size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment) if err != nil { return err } decreaseAmount := int(size) + delta - if decreaseAmount < machinedeployment.minSize { - klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta)) - decreaseAmount = machinedeployment.minSize + if decreaseAmount < machineDeployment.minSize { + klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machineDeployment.minSize, machineDeployment.Name, size+int64(delta)) + decreaseAmount = machineDeployment.minSize } - return machinedeployment.mcmManager.retry(func(ctx context.Context) (bool, error) { - return machinedeployment.mcmManager.SetMachineDeploymentSize(ctx, machinedeployment, int64(decreaseAmount)) - }, "MachineDeployment", "update", machinedeployment.Name) + return machineDeployment.mcmManager.retry(func(ctx context.Context) (bool, error) { + return machineDeployment.mcmManager.SetMachineDeploymentSize(ctx, machineDeployment, int64(decreaseAmount)) + }, "MachineDeployment", "update", machineDeployment.Name) } // Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment @@ -379,20 +373,20 @@ func (machineDeployment *MachineDeployment) Refresh() error { klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name) return nil } - markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...) + markedMachines := getMachinesMarkedByCAForDeletion(mcd) machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name) if err != nil { klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) return err } - var incorrectlyMarkedMachines []*Ref + var incorrectlyMarkedMachines []*types.NamespacedName for _, machine := range machines { // no need to reset priority for machines already in termination or failed phase if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { continue } if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) { - incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace}) + incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace}) } } var updatedMarkedMachines []string @@ -416,19 +410,19 @@ func (machineDeployment *MachineDeployment) Refresh() error { // Belongs returns true if the given node belongs to the NodeGroup. // TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list. -func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) { - ref, err := ReferenceFromProviderID(machinedeployment.mcmManager, node.Spec.ProviderID) +func (machineDeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) { + ref, err := ReferenceFromProviderID(machineDeployment.mcmManager, node.Spec.ProviderID) if err != nil { return false, err } - targetMd, err := machinedeployment.mcmManager.GetMachineDeploymentForMachine(ref) + targetMd, err := machineDeployment.mcmManager.GetMachineDeploymentForMachine(ref) if err != nil { return false, err } if targetMd == nil { return false, fmt.Errorf("%s doesn't belong to a known MachinDeployment", node.Name) } - if targetMd.Id() != machinedeployment.Id() { + if targetMd.Id() != machineDeployment.Id() { return false, nil } return true, nil @@ -436,31 +430,31 @@ func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, err // DeleteNodes deletes the nodes from the group. It is expected that this method will not be called // for nodes which are not part of ANY machine deployment. -func (machinedeployment *MachineDeployment) DeleteNodes(nodes []*apiv1.Node) error { +func (machineDeployment *MachineDeployment) DeleteNodes(nodes []*apiv1.Node) error { nodeNames := getNodeNames(nodes) klog.V(0).Infof("Received request to delete nodes:- %v", nodeNames) - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) + size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment) if err != nil { return err } - if int(size) <= machinedeployment.MinSize() { + if int(size) <= machineDeployment.MinSize() { return fmt.Errorf("min size reached, nodes will not be deleted") } - machines := make([]*Ref, 0, len(nodes)) + machines := make([]*types.NamespacedName, 0, len(nodes)) for _, node := range nodes { - belongs, err := machinedeployment.Belongs(node) + belongs, err := machineDeployment.Belongs(node) if err != nil { return err } else if !belongs { - return fmt.Errorf("%s belongs to a different machinedeployment than %s", node.Name, machinedeployment.Id()) + return fmt.Errorf("%s belongs to a different machinedeployment than %s", node.Name, machineDeployment.Id()) } - ref, err := ReferenceFromProviderID(machinedeployment.mcmManager, node.Spec.ProviderID) + ref, err := ReferenceFromProviderID(machineDeployment.mcmManager, node.Spec.ProviderID) if err != nil { return fmt.Errorf("couldn't find the machine-name from provider-id %s", node.Spec.ProviderID) } machines = append(machines, ref) } - return machinedeployment.mcmManager.DeleteMachines(machines) + return machineDeployment.mcmManager.DeleteMachines(machines) } func getNodeNames(nodes []*apiv1.Node) interface{} { @@ -472,20 +466,20 @@ func getNodeNames(nodes []*apiv1.Node) interface{} { } // Id returns machinedeployment id. -func (machinedeployment *MachineDeployment) Id() string { - return machinedeployment.Name +func (machineDeployment *MachineDeployment) Id() string { + return machineDeployment.Name } // Debug returns a debug string for the Asg. -func (machinedeployment *MachineDeployment) Debug() string { - return fmt.Sprintf("%s (%d:%d)", machinedeployment.Id(), machinedeployment.MinSize(), machinedeployment.MaxSize()) +func (machineDeployment *MachineDeployment) Debug() string { + return fmt.Sprintf("%s (%d:%d)", machineDeployment.Id(), machineDeployment.MinSize(), machineDeployment.MaxSize()) } // Nodes returns a list of all nodes that belong to this node group. -func (machinedeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, error) { - instances, err := machinedeployment.mcmManager.GetInstancesForMachineDeployment(machinedeployment) +func (machineDeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, error) { + instances, err := machineDeployment.mcmManager.GetInstancesForMachineDeployment(machineDeployment) if err != nil { - return nil, fmt.Errorf("failed to get the cloudprovider.Instance for machines backed by the machinedeployment %q, error: %v", machinedeployment.Name, err) + return nil, fmt.Errorf("failed to get the cloudprovider.Instance for machines backed by the machinedeployment %q, error: %v", machineDeployment.Name, err) } erroneousInstanceInfos := make([]string, 0, len(instances)) for _, instance := range instances { @@ -502,9 +496,9 @@ func (machinedeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, e // GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular // NodeGroup. Returning a nil will result in using default options. // Implementation optional. -func (machinedeployment *MachineDeployment) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { +func (machineDeployment *MachineDeployment) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { options := defaults - mcdAnnotations, err := machinedeployment.mcmManager.GetMachineDeploymentAnnotations(machinedeployment.Name) + mcdAnnotations, err := machineDeployment.mcmManager.GetMachineDeploymentAnnotations(machineDeployment.Name) if err != nil { return nil, err } @@ -538,25 +532,25 @@ func (machinedeployment *MachineDeployment) GetOptions(defaults config.NodeGroup } // TemplateNodeInfo returns a node template for this node group. -func (machinedeployment *MachineDeployment) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { +func (machineDeployment *MachineDeployment) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { - nodeTemplate, err := machinedeployment.mcmManager.GetMachineDeploymentNodeTemplate(machinedeployment) + nodeTemplate, err := machineDeployment.mcmManager.GetMachineDeploymentNodeTemplate(machineDeployment) if err != nil { return nil, err } - node, err := machinedeployment.mcmManager.buildNodeFromTemplate(machinedeployment.Name, nodeTemplate) + node, err := machineDeployment.mcmManager.buildNodeFromTemplate(machineDeployment.Name, nodeTemplate) if err != nil { return nil, err } - nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(machinedeployment.Name)) + nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(machineDeployment.Name)) nodeInfo.SetNode(node) return nodeInfo, nil } // AtomicIncreaseSize is not implemented. -func (machinedeployment *MachineDeployment) AtomicIncreaseSize(delta int) error { +func (machineDeployment *MachineDeployment) AtomicIncreaseSize(delta int) error { return cloudprovider.ErrNotImplemented } @@ -573,13 +567,17 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach return machinedeployment, nil } +func getMachinesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) sets.Set[string] { + return sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...) +} + func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment { return &MachineDeployment{ mcmManager: mcmManager, minSize: minSize, maxSize: maxSize, scalingMutex: sync.Mutex{}, - Ref: Ref{ + NamespacedName: types.NamespacedName{ Name: name, Namespace: namespace, }, diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 6ce7b5b70a3c..71f682df9c8f 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -28,7 +28,6 @@ import ( "flag" "fmt" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" v1appslister "k8s.io/client-go/listers/apps/v1" "k8s.io/utils/pointer" "maps" @@ -370,7 +369,7 @@ func CreateMcmManager(discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*M } // GetMachineDeploymentForMachine returns the MachineDeployment for the Machine object. -func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeployment, error) { +func (m *McmManager) GetMachineDeploymentForMachine(machine *types.NamespacedName) (*MachineDeployment, error) { if machine.Name == "" { // Considering the possibility when Machine has been deleted but due to cached Node object it appears here. return nil, fmt.Errorf("Node does not Exists") @@ -455,7 +454,7 @@ func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeploy } // DeleteMachines annotates the target machines and also reduces the desired replicas of the MachineDeployment. -func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { +func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) error { if len(targetMachineRefs) == 0 { return nil } @@ -475,7 +474,7 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { if !isRollingUpdateFinished(md) { return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) } - markedMachines := sets.New(strings.Split(md.Annotations[machinesMarkedByCAForDeletion], ",")...) + markedMachines := getMachinesMarkedByCAForDeletion(md) // update priorities of machines to be deleted except the ones already in termination to 1 machinesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs) if err != nil { @@ -493,7 +492,7 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { } // resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue -func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { +func (m *McmManager) resetPriorityForMachines(mcRefs []*types.NamespacedName) error { var collectiveError error for _, mcRef := range mcRefs { machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name) @@ -524,7 +523,7 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { } // prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1 -func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) ([]string, error) { +func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*types.NamespacedName) ([]string, error) { var expectedToTerminateMachineNodePairs = make(map[string]string) var machinesMarkedWithPrio1 []string for _, machineRef := range targetMachineRefs { From 9063248d3af6d37f75834154954584207d586722 Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Thu, 26 Dec 2024 15:01:48 +0530 Subject: [PATCH 6/7] address review comments - part 2 --- .../cloudprovider/mcm/mcm_cloud_provider.go | 78 +++++-------- .../cloudprovider/mcm/mcm_manager.go | 104 ++++++++++++------ 2 files changed, 97 insertions(+), 85 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index eff0e4bfaabf..ed570472e598 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -26,7 +26,6 @@ import ( "fmt" "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" "slices" "strconv" "strings" @@ -39,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/config" - "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -364,47 +362,47 @@ func (machineDeployment *MachineDeployment) DecreaseTargetSize(delta int) error func (machineDeployment *MachineDeployment) Refresh() error { machineDeployment.scalingMutex.Lock() defer machineDeployment.scalingMutex.Unlock() - mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name) + mcd, err := machineDeployment.mcmManager.GetMachineDeploymentResource(machineDeployment.Name) if err != nil { - return fmt.Errorf("failed to get machine deployment %s: %v", machineDeployment.Name, err) - } - // ignore the machine deployment if it is in rolling update - if !isRollingUpdateFinished(mcd) { - klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name) - return nil + return err } - markedMachines := getMachinesMarkedByCAForDeletion(mcd) + markedMachineNames := getMachineNamesMarkedByCAForDeletion(mcd) machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name) if err != nil { klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) return err } - var incorrectlyMarkedMachines []*types.NamespacedName - for _, machine := range machines { - // no need to reset priority for machines already in termination or failed phase - if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { - continue - } - if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) { - incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace}) - } - } - var updatedMarkedMachines []string - for machineName := range markedMachines { + // update the machines-marked-by-ca-for-deletion annotation with the machines that are still marked for deletion by CA. + // This is done to ensure that the machines that are no longer present are removed from the annotation. + var updatedMarkedMachineNames []string + for _, machineName := range markedMachineNames { if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool { return mc.Name == machineName }) { - updatedMarkedMachines = append(updatedMarkedMachines, machineName) + updatedMarkedMachineNames = append(updatedMarkedMachineNames, machineName) } } clone := mcd.DeepCopy() - clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",") + clone.Annotations[machinesMarkedByCAForDeletion] = createMachinesMarkedForDeletionAnnotationValue(updatedMarkedMachineNames) ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout) defer cancelFn() _, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) if err != nil { return err } + // reset the priority for the machines that are not present in machines-marked-by-ca-for-deletion annotation + var incorrectlyMarkedMachines []types.NamespacedName + for _, machine := range machines { + // no need to reset priority for machines already in termination or failed phase + if isMachineFailedOrTerminating(machine) { + continue + } + // check if the machine is marked for deletion by CA but not present in machines-marked-by-ca-for-deletion annotation. This means that CA was not able to reduce the replicas + // corresponding to this machine and hence the machine should not be marked for deletion. + if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForDeletionCandidateMachines && !slices.Contains(markedMachineNames, machine.Name) { + incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace}) + } + } return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines) } @@ -554,32 +552,10 @@ func (machineDeployment *MachineDeployment) AtomicIncreaseSize(delta int) error return cloudprovider.ErrNotImplemented } -func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*MachineDeployment, error) { - spec, err := dynamic.SpecFromString(value, true) - - if err != nil { - return nil, fmt.Errorf("failed to parse node group spec: %v", err) - } - s := strings.Split(spec.Name, ".") - Namespace, Name := s[0], s[1] - - machinedeployment := buildMachineDeployment(mcmManager, spec.MinSize, spec.MaxSize, Namespace, Name) - return machinedeployment, nil -} - -func getMachinesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) sets.Set[string] { - return sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...) -} - -func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment { - return &MachineDeployment{ - mcmManager: mcmManager, - minSize: minSize, - maxSize: maxSize, - scalingMutex: sync.Mutex{}, - NamespacedName: types.NamespacedName{ - Name: name, - Namespace: namespace, - }, +// getMachineNamesMarkedByCAForDeletion returns the set of machine names marked by CA for deletion. +func getMachineNamesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) []string { + if mcd.Annotations == nil || mcd.Annotations[machinesMarkedByCAForDeletion] == "" { + return make([]string, 0) } + return strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",") } diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 71f682df9c8f..8d4e3cdce84f 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -28,6 +28,7 @@ import ( "flag" "fmt" "k8s.io/apimachinery/pkg/types" + "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" v1appslister "k8s.io/client-go/listers/apps/v1" "k8s.io/utils/pointer" "maps" @@ -37,6 +38,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" awsapis "github.com/gardener/machine-controller-manager-provider-aws/pkg/aws/apis" @@ -77,9 +79,9 @@ const ( defaultResetAnnotationTimeout = 10 * time.Second // defaultPriorityValue is the default value for the priority annotation used by CA. It is set to 3 because MCM defaults the priority of machine it creates to 3. defaultPriorityValue = "3" - // priorityValueForCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1. - priorityValueForCandidateMachines = "1" - minResyncPeriodDefault = 1 * time.Hour + // priorityValueForDeletionCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1. + priorityValueForDeletionCandidateMachines = "1" + minResyncPeriodDefault = 1 * time.Hour // machinePriorityAnnotation is the annotation to set machine priority while deletion machinePriorityAnnotation = "machinepriority.machine.sapcloud.io" // kindMachineClass is the kind for generic machine class used by the OOT providers @@ -413,11 +415,11 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *types.NamespacedNam // Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired. // It will select the machines to reset the priority based on the descending order of creation timestamp. func (m *McmManager) Refresh() error { - var collectiveError error + var collectiveError []error for _, machineDeployment := range m.machineDeployments { - collectiveError = errors.Join(collectiveError, machineDeployment.Refresh()) + collectiveError = append(collectiveError, machineDeployment.Refresh()) } - return collectiveError + return errors.Join(collectiveError...) } // Cleanup does nothing at the moment. @@ -428,18 +430,17 @@ func (m *McmManager) Cleanup() { // GetMachineDeploymentSize returns the replicas field of the MachineDeployment func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployment) (int64, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) + md, err := m.GetMachineDeploymentResource(machinedeployment.Name) if err != nil { - return 0, fmt.Errorf("Unable to fetch MachineDeployment object %s %v", machinedeployment.Name, err) + return 0, err } return int64(md.Spec.Replicas), nil } // SetMachineDeploymentSize sets the desired size for the Machinedeployment. func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeployment *MachineDeployment, size int64) (bool, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) + md, err := m.GetMachineDeploymentResource(machinedeployment.Name) if err != nil { - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", machinedeployment.Name, err) return true, err } // don't scale down during rolling update, as that could remove ready node with workload @@ -466,24 +467,23 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) e commonMachineDeployment.scalingMutex.Lock() defer commonMachineDeployment.scalingMutex.Unlock() // get the machine deployment and return if rolling update is not finished - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name) + md, err := m.GetMachineDeploymentResource(commonMachineDeployment.Name) if err != nil { - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", commonMachineDeployment.Name, err) return err } if !isRollingUpdateFinished(md) { return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) } - markedMachines := getMachinesMarkedByCAForDeletion(md) + machineNamesMarkedByCA := getMachineNamesMarkedByCAForDeletion(md) // update priorities of machines to be deleted except the ones already in termination to 1 - machinesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs) + machineNamesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs) if err != nil { return err } - markedMachines.Insert(machinesWithPrio1...) + machineNamesMarkedByCA = append(machineNamesMarkedByCA, machineNamesWithPrio1...) // Trying to update the machineDeployment till the deadline err = m.retry(func(ctx context.Context) (bool, error) { - return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machinesWithPrio1), strings.Join(markedMachines.UnsortedList(), ",")) + return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machineNamesWithPrio1), createMachinesMarkedForDeletionAnnotationValue(machineNamesMarkedByCA)) }, "MachineDeployment", "update", commonMachineDeployment.Name) if err != nil { klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err) @@ -492,8 +492,8 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) e } // resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue -func (m *McmManager) resetPriorityForMachines(mcRefs []*types.NamespacedName) error { - var collectiveError error +func (m *McmManager) resetPriorityForMachines(mcRefs []types.NamespacedName) error { + var collectiveError []error for _, mcRef := range mcRefs { machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name) if kube_errors.IsNotFound(err) { @@ -501,7 +501,7 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*types.NamespacedName) er continue } if err != nil { - collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err)) + collectiveError = append(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err)) continue } ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(defaultResetAnnotationTimeout)) @@ -515,17 +515,18 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*types.NamespacedName) er return nil }() if err != nil { - collectiveError = errors.Join(collectiveError, fmt.Errorf("could not reset priority annotation on machine %s, Error: %v", machine.Name, err)) + collectiveError = append(collectiveError, fmt.Errorf("could not reset priority annotation on machine %s, Error: %v", machine.Name, err)) continue } } - return collectiveError + return errors.Join(collectiveError...) } // prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1 func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*types.NamespacedName) ([]string, error) { var expectedToTerminateMachineNodePairs = make(map[string]string) - var machinesMarkedWithPrio1 []string + var prio1MarkedMachineNames []string + for _, machineRef := range targetMachineRefs { // Trying to update the priority of machineRef till m.maxRetryTimeout if err := m.retry(func(ctx context.Context) (bool, error) { @@ -541,20 +542,20 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*types.Na if isMachineFailedOrTerminating(mc) { return false, nil } - if mc.Annotations[machinePriorityAnnotation] == priorityValueForCandidateMachines { + if mc.Annotations[machinePriorityAnnotation] == priorityValueForDeletionCandidateMachines { klog.Infof("Machine %q priority is already set to 1, hence skipping the update", mc.Name) return false, nil } - machinesMarkedWithPrio1 = append(machinesMarkedWithPrio1, machineRef.Name) + prio1MarkedMachineNames = append(prio1MarkedMachineNames, machineRef.Name) expectedToTerminateMachineNodePairs[mc.Name] = mc.Labels["node"] - return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines) + return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForDeletionCandidateMachines) }, "Machine", "update", machineRef.Name); err != nil { klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) return nil, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) } } klog.V(2).Infof("Expected to remove following {machineRef: corresponding node} pairs %s", expectedToTerminateMachineNodePairs) - return machinesMarkedWithPrio1, nil + return prio1MarkedMachineNames, nil } // updateAnnotationOnMachine returns error only when updating the annotations on machine has been failing consequently and deadline is crossed @@ -583,9 +584,8 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin // scaleDownAndAnnotateMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down. // It also updates the machines-marked-by-ca-for-deletion annotation on the machine deployment with the list of existing machines marked for deletion. func (m *McmManager) scaleDownAndAnnotateMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) + md, err := m.GetMachineDeploymentResource(mdName) if err != nil { - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", mdName, err) return true, err } mdclone := md.DeepCopy() @@ -732,21 +732,19 @@ func validateNodeTemplate(nodeTemplateAttributes *v1alpha1.NodeTemplate) error { // GetMachineDeploymentAnnotations returns the annotations present on the machine deployment for the provided machine deployment name func (m *McmManager) GetMachineDeploymentAnnotations(machineDeploymentName string) (map[string]string, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machineDeploymentName) + md, err := m.GetMachineDeploymentResource(machineDeploymentName) if err != nil { - return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", machineDeploymentName, err) + return nil, err } - return md.Annotations, nil } // GetMachineDeploymentNodeTemplate returns the NodeTemplate of a node belonging to the same worker pool as the machinedeployment // If no node present then it forms the nodeTemplate using the one present in machineClass func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *MachineDeployment) (*nodeTemplate, error) { - - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) + md, err := m.GetMachineDeploymentResource(machinedeployment.Name) if err != nil { - return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", machinedeployment.Name, err) + return nil, err } var ( @@ -892,6 +890,16 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine return nodeTmpl, nil } +// GetMachineDeploymentResource returns the MachineDeployment object for the provided machine deployment name +func (m *McmManager) GetMachineDeploymentResource(mdName string) (*v1alpha1.MachineDeployment, error) { + md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) + if err != nil { + klog.Errorf("unable to fetch MachineDeployment object %s, Error: %v", mdName, err) + return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", mdName, err) + } + return md, nil +} + func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { for _, cond := range md.Status.Conditions { switch { @@ -1034,6 +1042,30 @@ func buildGenericLabels(template *nodeTemplate, nodeName string) map[string]stri return result } +func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*MachineDeployment, error) { + spec, err := dynamic.SpecFromString(value, true) + if err != nil { + return nil, fmt.Errorf("failed to parse node group spec: %v", err) + } + s := strings.Split(spec.Name, ".") + Namespace, Name := s[0], s[1] + machinedeployment := buildMachineDeployment(mcmManager, spec.MinSize, spec.MaxSize, Namespace, Name) + return machinedeployment, nil +} + +func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment { + return &MachineDeployment{ + mcmManager: mcmManager, + minSize: minSize, + maxSize: maxSize, + scalingMutex: sync.Mutex{}, + NamespacedName: types.NamespacedName{ + Name: name, + Namespace: namespace, + }, + } +} + // isMachineFailedOrTerminating returns true if machine is already being terminated or considered for termination by autoscaler. func isMachineFailedOrTerminating(machine *v1alpha1.Machine) bool { if !machine.GetDeletionTimestamp().IsZero() || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { @@ -1051,3 +1083,7 @@ func filterExtendedResources(allResources v1.ResourceList) (extendedResources v1 }) return } + +func createMachinesMarkedForDeletionAnnotationValue(machineNames []string) string { + return strings.Join(machineNames, ",") +} From 68d2046910072f74a194fa322172f53ce886536b Mon Sep 17 00:00:00 2001 From: Rishabh Patel Date: Sat, 28 Dec 2024 15:45:25 +0530 Subject: [PATCH 7/7] update unit tests, misc code changes --- .../cloudprovider/mcm/mcm_cloud_provider.go | 22 +- .../mcm/mcm_cloud_provider_test.go | 292 ++++++++---------- .../cloudprovider/mcm/mcm_manager.go | 31 +- .../cloudprovider/mcm/test_utils.go | 55 ++-- 4 files changed, 197 insertions(+), 203 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index ed570472e598..bf6cda949dcc 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -383,15 +383,21 @@ func (machineDeployment *MachineDeployment) Refresh() error { } } clone := mcd.DeepCopy() - clone.Annotations[machinesMarkedByCAForDeletion] = createMachinesMarkedForDeletionAnnotationValue(updatedMarkedMachineNames) - ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout) - defer cancelFn() - _, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) - if err != nil { - return err + if clone.Annotations == nil { + clone.Annotations = map[string]string{} + } + updatedMachinesMarkedByCAForDeletionAnnotationVal := createMachinesMarkedForDeletionAnnotationValue(updatedMarkedMachineNames) + if clone.Annotations[machinesMarkedByCAForDeletion] != updatedMachinesMarkedByCAForDeletionAnnotationVal { + clone.Annotations[machinesMarkedByCAForDeletion] = updatedMachinesMarkedByCAForDeletionAnnotationVal + ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout) + defer cancelFn() + _, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return err + } } // reset the priority for the machines that are not present in machines-marked-by-ca-for-deletion annotation - var incorrectlyMarkedMachines []types.NamespacedName + var incorrectlyMarkedMachines []string for _, machine := range machines { // no need to reset priority for machines already in termination or failed phase if isMachineFailedOrTerminating(machine) { @@ -400,7 +406,7 @@ func (machineDeployment *MachineDeployment) Refresh() error { // check if the machine is marked for deletion by CA but not present in machines-marked-by-ca-for-deletion annotation. This means that CA was not able to reduce the replicas // corresponding to this machine and hence the machine should not be marked for deletion. if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForDeletionCandidateMachines && !slices.Contains(markedMachineNames, machine.Name) { - incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace}) + incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, machine.Name) } } return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go index 6752c2497852..995f51834f95 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "math" + "slices" "strings" "testing" "time" @@ -85,10 +86,11 @@ func TestDeleteNodes(t *testing.T) { node *corev1.Node } type expect struct { - machines []*v1alpha1.Machine - mdName string - mdReplicas int32 - err error + prio1Machines []*v1alpha1.Machine + mdName string + mdReplicas int32 + machinesMarkedByCAAnnotationValue string + err error } type data struct { name string @@ -100,42 +102,44 @@ func TestDeleteNodes(t *testing.T) { { "should scale down machine deployment to remove a node", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - mdName: "machinedeployment-1", - mdReplicas: 1, - err: nil, + prio1Machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + mdName: "machinedeployment-1", + machinesMarkedByCAAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), + mdReplicas: 1, + err: nil, }, }, { "should scale down machine deployment to remove a placeholder node", setup{ nodes: nil, - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, }, - action{node: newNode("node-1", "requested://machine-1", true)}, + action{node: newNode("node-1", "requested://machine-1")}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - mdName: "machinedeployment-1", - mdReplicas: 0, - err: nil, + prio1Machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + machinesMarkedByCAAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), + mdName: "machinedeployment-1", + mdReplicas: 0, + err: nil, }, }, { "should not scale down a machine deployment when it is under rolling update", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(2, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, &v1alpha1.MachineDeploymentStatus{ Conditions: []v1alpha1.MachineDeploymentCondition{ @@ -144,19 +148,19 @@ func TestDeleteNodes(t *testing.T) { }, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-1", - mdReplicas: 2, - err: fmt.Errorf("MachineDeployment machinedeployment-1 is under rolling update , cannot reduce replica count"), + prio1Machines: nil, + mdName: "machinedeployment-1", + mdReplicas: 2, + err: fmt.Errorf("MachineDeployment machinedeployment-1 is under rolling update , cannot reduce replica count"), }, }, { "should not scale down when machine deployment update call times out and should reset priority of the corresponding machine", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, @@ -166,9 +170,8 @@ func TestDeleteNodes(t *testing.T) { }, }, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), mdName: "machinedeployment-1", mdReplicas: 2, err: errors.Join(nil, fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %w", errors.New(mdUpdateErrorMsg))), @@ -177,8 +180,8 @@ func TestDeleteNodes(t *testing.T) { { "should scale down when machine deployment update call fails but passes within the timeout period", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, @@ -188,26 +191,26 @@ func TestDeleteNodes(t *testing.T) { }, }, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - mdName: "machinedeployment-1", - mdReplicas: 1, - err: nil, + prio1Machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + machinesMarkedByCAAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), + mdName: "machinedeployment-1", + mdReplicas: 1, + err: nil, }, }, { "should not scale down a machine deployment when the corresponding machine is already in terminating state", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{true, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineTerminating}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{true}), mdName: "machinedeployment-1", mdReplicas: 2, err: nil, @@ -216,15 +219,14 @@ func TestDeleteNodes(t *testing.T) { { "should not scale down a machine deployment when the corresponding machine is already in failed state", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{false})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), mdName: "machinedeployment-1", mdReplicas: 2, err: nil, @@ -233,25 +235,25 @@ func TestDeleteNodes(t *testing.T) { { "should not scale down a machine deployment below the minimum", setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-1", - mdReplicas: 1, - err: fmt.Errorf("min size reached, nodes will not be deleted"), + prio1Machines: nil, + mdName: "machinedeployment-1", + mdReplicas: 1, + err: fmt.Errorf("min size reached, nodes will not be deleted"), }, }, { "no scale down of machine deployment if priority of the targeted machine cannot be updated to 1", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, @@ -261,29 +263,29 @@ func TestDeleteNodes(t *testing.T) { }, }, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-1", - mdReplicas: 2, - err: fmt.Errorf("could not prioritize machine machine-1 for deletion, aborting scale in of machine deployment, Error: %s", mcUpdateErrorMsg), + prio1Machines: nil, + mdName: "machinedeployment-1", + mdReplicas: 2, + err: fmt.Errorf("could not prioritize machine machine-1 for deletion, aborting scale in of machine deployment, Error: %s", mcUpdateErrorMsg), }, }, { "should not scale down machine deployment if the node belongs to another machine deployment", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-2", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-2", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-2"), machineDeployments: newMachineDeployments(2, 2, nil, nil, nil), nodeGroups: []string{nodeGroup2, nodeGroup3}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-2", - mdReplicas: 2, - err: fmt.Errorf("node-1 belongs to a different machinedeployment than machinedeployment-1"), + prio1Machines: nil, + mdName: "machinedeployment-2", + mdReplicas: 2, + err: fmt.Errorf("node-1 belongs to a different machinedeployment than machinedeployment-1"), }, }, } @@ -296,7 +298,7 @@ func TestDeleteNodes(t *testing.T) { stop := make(chan struct{}) defer close(stop) controlMachineObjects, targetCoreObjects, _ := setupEnv(&entry.setup) - m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects, nil) + m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, entry.setup.nodeGroups, controlMachineObjects, targetCoreObjects, nil) defer trackers.Stop() waitForCacheSync(t, stop, hasSyncedCacheFns) @@ -321,6 +323,7 @@ func TestDeleteNodes(t *testing.T) { machineDeployment, err := m.machineClient.MachineDeployments(m.namespace).Get(context.TODO(), entry.expect.mdName, metav1.GetOptions{}) g.Expect(err).ToNot(HaveOccurred()) g.Expect(machineDeployment.Spec.Replicas).To(BeNumerically("==", entry.expect.mdReplicas)) + g.Expect(machineDeployment.Annotations[machinesMarkedByCAForDeletion]).To(Equal(entry.expect.machinesMarkedByCAAnnotationValue)) machines, err := m.machineClient.Machines(m.namespace).List(context.TODO(), metav1.ListOptions{ LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{ @@ -329,26 +332,52 @@ func TestDeleteNodes(t *testing.T) { }) for _, machine := range machines.Items { - flag := false - for _, entryMachineItem := range entry.expect.machines { - if entryMachineItem.Name == machine.Name { - g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(entryMachineItem.Annotations[machinePriorityAnnotation])) - flag = true - break - } - } - if !flag { - g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal("3")) + if slices.ContainsFunc(entry.expect.prio1Machines, func(m *v1alpha1.Machine) bool { + return machine.Name == m.Name + }) { + g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(priorityValueForDeletionCandidateMachines)) + } else { + g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(defaultPriorityValue)) } } }) } } +func TestIdempotencyOfDeleteNodes(t *testing.T) { + setupObj := setup{ + nodes: newNodes(3, "fakeID"), + machines: newMachines(3, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3", "3"}), + machineSets: newMachineSets(1, "machinedeployment-1"), + machineDeployments: newMachineDeployments(1, 3, nil, nil, nil), + nodeGroups: []string{nodeGroup1}, + } + g := NewWithT(t) + stop := make(chan struct{}) + defer close(stop) + controlMachineObjects, targetCoreObjects, _ := setupEnv(&setupObj) + m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, setupObj.nodeGroups, controlMachineObjects, targetCoreObjects, nil) + defer trackers.Stop() + waitForCacheSync(t, stop, hasSyncedCacheFns) + md, err := buildMachineDeploymentFromSpec(setupObj.nodeGroups[0], m) + g.Expect(err).To(BeNil()) + + err = md.DeleteNodes(newNodes(1, "fakeID")) + g.Expect(err).To(BeNil()) + err = md.DeleteNodes(newNodes(1, "fakeID")) + g.Expect(err).To(BeNil()) + + machineDeployment, err := m.machineClient.MachineDeployments(m.namespace).Get(context.TODO(), setupObj.machineDeployments[0].Name, metav1.GetOptions{}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(machineDeployment.Spec.Replicas).To(BeNumerically("==", 2)) + g.Expect(machineDeployment.Annotations[machinesMarkedByCAForDeletion]).To(Equal(createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)))) +} + func TestRefresh(t *testing.T) { type expect struct { - machines []*v1alpha1.Machine - err error + prio3Machines []string + machinesMarkedByCAForDeletionAnnotationValue string + err error } type data struct { name string @@ -359,8 +388,8 @@ func TestRefresh(t *testing.T) { { "should return an error if MCM has zero available replicas", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, mcmDeployment: newMCMDeployment(0), @@ -372,8 +401,8 @@ func TestRefresh(t *testing.T) { { "should return an error if MCM deployment is not found", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, }, @@ -382,88 +411,38 @@ func TestRefresh(t *testing.T) { }, }, { - "should reset priority of a machine to 3 if machine deployment is not scaled in", + "should reset priority of a machine if it is not present in machines-marked-by-ca-for-deletion annotation on machine deployment", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, mcmDeployment: newMCMDeployment(1), }, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), - err: nil, + prio3Machines: generateNames("machine", 1), + err: nil, }, }, { - "should reset priority of a machine to 3 if machine deployment is not scaled in even if ToBeDeletedTaint is present on the corresponding node", + "should update the machines-marked-by-ca-for-deletion annotation and remove non-existing machines", setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + machineDeployments: newMachineDeployments(1, 0, nil, map[string]string{machinesMarkedByCAForDeletion: "machine-1,machine-2"}, nil), nodeGroups: []string{nodeGroup2}, mcmDeployment: newMCMDeployment(1), }, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), - err: nil, - }, - }, - { - "should NOT skip paused machine deployment", - setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 1, &v1alpha1.MachineDeploymentStatus{ - Conditions: []v1alpha1.MachineDeploymentCondition{ - {Type: v1alpha1.MachineDeploymentProgressing, Status: v1alpha1.ConditionUnknown, Reason: machineDeploymentPausedReason}, - }, - }, nil, nil), - nodeGroups: []string{nodeGroup2}, - mcmDeployment: newMCMDeployment(1), - }, - expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), - err: nil, - }, - }, - { - "should ignore terminating/failed machines in checking if number of annotated machines is more than desired", - setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{ - CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}, - }, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), - nodeGroups: []string{nodeGroup2}, - mcmDeployment: newMCMDeployment(1), - }, - expect{ - machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{ - CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}, - }, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + machinesMarkedByCAForDeletionAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), err: nil, }, }, - { - "should not reset priority of a machine to 3 if machine deployment is scaled in", - setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 0, nil, nil, nil), - nodeGroups: []string{nodeGroup2}, - mcmDeployment: newMCMDeployment(1), - }, - expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - err: nil, - }, - }, { "priority reset of machine fails", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), controlMachineFakeResourceActions: &customfake.ResourceActions{ Machine: customfake.Actions{ @@ -474,8 +453,7 @@ func TestRefresh(t *testing.T) { mcmDeployment: newMCMDeployment(1), }, expect{ - machines: []*v1alpha1.Machine{newMachine("machine-1", "fakeID-1", nil, "machinedeployment-1", "machineset-1", "1", false, true)}, - err: errors.Join(nil, errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg))), + err: errors.Join(nil, errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg))), }, }, } @@ -505,10 +483,14 @@ func TestRefresh(t *testing.T) { } else { g.Expect(err).To(BeNil()) } - for _, mc := range entry.expect.machines { - machine, err := m.machineClient.Machines(m.namespace).Get(context.TODO(), mc.Name, metav1.GetOptions{}) - g.Expect(err).To(BeNil()) - g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(machine.Annotations[machinePriorityAnnotation])) + machines, err := m.machineClient.Machines(m.namespace).List(context.TODO(), metav1.ListOptions{}) + g.Expect(err).To(BeNil()) + for _, mc := range machines.Items { + if slices.Contains(entry.expect.prio3Machines, mc.Name) { + g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(defaultPriorityValue)) + } else { + g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(priorityValueForDeletionCandidateMachines)) + } } }) } @@ -554,14 +536,14 @@ func TestNodes(t *testing.T) { { "Correct instances should be returned for machine objects under the machinedeployment", setup{ - nodes: []*corev1.Node{newNode("node-1", "fakeID-1", false)}, + nodes: []*corev1.Node{newNode("node-1", "fakeID-1")}, machines: func() []*v1alpha1.Machine { allMachines := make([]*v1alpha1.Machine, 0, 5) - allMachines = append(allMachines, newMachine("machine-with-registered-node", "fakeID-1", nil, "machinedeployment-1", "", "", false, true)) - allMachines = append(allMachines, newMachine("machine-with-vm-but-no-node", "fakeID-2", nil, "machinedeployment-1", "", "", false, false)) - allMachines = append(allMachines, newMachine("machine-with-vm-creating", "", nil, "machinedeployment-1", "", "", false, false)) - allMachines = append(allMachines, newMachine("machine-with-vm-create-error-out-of-quota", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.ResourceExhausted.String(), Description: outOfQuotaMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false, false)) - allMachines = append(allMachines, newMachine("machine-with-vm-create-error-invalid-credentials", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.Internal.String(), Description: invalidCredentialsMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false, false)) + allMachines = append(allMachines, newMachine("machine-with-registered-node", "fakeID-1", nil, "machinedeployment-1", "", "", true)) + allMachines = append(allMachines, newMachine("machine-with-vm-but-no-node", "fakeID-2", nil, "machinedeployment-1", "", "", false)) + allMachines = append(allMachines, newMachine("machine-with-vm-creating", "", nil, "machinedeployment-1", "", "", false)) + allMachines = append(allMachines, newMachine("machine-with-vm-create-error-out-of-quota", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.ResourceExhausted.String(), Description: outOfQuotaMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false)) + allMachines = append(allMachines, newMachine("machine-with-vm-create-error-invalid-credentials", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.Internal.String(), Description: invalidCredentialsMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false)) return allMachines }(), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 8d4e3cdce84f..c9cba993775b 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -267,10 +267,9 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti maxRetryTimeout: maxRetryTimeout, retryInterval: retryInterval, } - for _, spec := range discoveryOpts.NodeGroupSpecs { - if err := m.addNodeGroup(spec); err != nil { - return nil, err - } + err = m.generateMachineDeploymentMap() + if err != nil { + return nil, err } targetCoreInformerFactory.Start(m.interrupt) controlMachineInformerFactory.Start(m.interrupt) @@ -294,6 +293,15 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti return nil, fmt.Errorf("Unable to start cloud provider MCM for cluster autoscaler: API GroupVersion %q or %q or %q is not available; \nFound: %#v", machineGVR, machineSetGVR, machineDeploymentGVR, availableResources) } +func (m *McmManager) generateMachineDeploymentMap() error { + for _, spec := range m.discoveryOpts.NodeGroupSpecs { + if err := m.addNodeGroup(spec); err != nil { + return err + } + } + return nil +} + // addNodeGroup adds node group defined in string spec. Format: // minNodes:maxNodes:namespace.machineDeploymentName func (m *McmManager) addNodeGroup(spec string) error { @@ -486,22 +494,23 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) e return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machineNamesWithPrio1), createMachinesMarkedForDeletionAnnotationValue(machineNamesMarkedByCA)) }, "MachineDeployment", "update", commonMachineDeployment.Name) if err != nil { - klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err) + klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err) + return errors.Join(err, m.resetPriorityForMachines(machineNamesWithPrio1)) } - return nil + return err } // resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue -func (m *McmManager) resetPriorityForMachines(mcRefs []types.NamespacedName) error { +func (m *McmManager) resetPriorityForMachines(mcNames []string) error { var collectiveError []error - for _, mcRef := range mcRefs { - machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name) + for _, mcName := range mcNames { + machine, err := m.machineLister.Machines(m.namespace).Get(mcName) if kube_errors.IsNotFound(err) { - klog.Warningf("Machine %s not found, skipping resetting priority annotation", mcRef.Name) + klog.Warningf("Machine %s not found, skipping resetting priority annotation", mcName) continue } if err != nil { - collectiveError = append(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err)) + collectiveError = append(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcName, err)) continue } ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(defaultResetAnnotationTimeout)) diff --git a/cluster-autoscaler/cloudprovider/mcm/test_utils.go b/cluster-autoscaler/cloudprovider/mcm/test_utils.go index 3c3f55e4c696..4822f30f1fc3 100644 --- a/cluster-autoscaler/cloudprovider/mcm/test_utils.go +++ b/cluster-autoscaler/cloudprovider/mcm/test_utils.go @@ -7,6 +7,7 @@ package mcm import ( "fmt" appsv1 "k8s.io/api/apps/v1" + types "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" "testing" "time" @@ -24,7 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" customfake "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mcm/fakeclient" - deletetaint "k8s.io/autoscaler/cluster-autoscaler/utils/taints" appsv1informers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers" ) @@ -42,7 +42,7 @@ func newMachineDeployments( labels map[string]string, ) []*v1alpha1.MachineDeployment { machineDeployments := make([]*v1alpha1.MachineDeployment, machineDeploymentCount) - for i := range machineDeployments { + for i := 0; i < machineDeploymentCount; i++ { machineDeployment := &v1alpha1.MachineDeployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "machine.sapcloud.io", @@ -74,7 +74,7 @@ func newMachineSets( ) []*v1alpha1.MachineSet { machineSets := make([]*v1alpha1.MachineSet, machineSetCount) - for i := range machineSets { + for i := 0; i < machineSetCount; i++ { ms := &v1alpha1.MachineSet{ TypeMeta: metav1.TypeMeta{ APIVersion: "machine.sapcloud.io", @@ -97,10 +97,9 @@ func newMachine( statusTemplate *v1alpha1.MachineStatus, mdName, msName string, priorityAnnotationValue string, - setDeletionTimeStamp, setNodeLabel bool, ) *v1alpha1.Machine { - m := newMachines(1, providerId, statusTemplate, mdName, msName, []string{priorityAnnotationValue}, []bool{setDeletionTimeStamp})[0] + m := newMachines(1, providerId, statusTemplate, mdName, msName, []string{priorityAnnotationValue})[0] m.Name = name m.Spec.ProviderID = providerId if !setNodeLabel { @@ -109,26 +108,34 @@ func newMachine( return m } +func generateNames(prefix string, count int) []string { + names := make([]string, count) + for i := 0; i < count; i++ { + names[i] = fmt.Sprintf("%s-%d", prefix, i+1) + } + return names +} + func newMachines( machineCount int, providerIdGenerateName string, statusTemplate *v1alpha1.MachineStatus, mdName, msName string, priorityAnnotationValues []string, - setDeletionTimeStamp []bool, ) []*v1alpha1.Machine { machines := make([]*v1alpha1.Machine, machineCount) - + machineNames := generateNames("machine", machineCount) + nodeNames := generateNames("node", machineCount) currentTime := metav1.Now() - for i := range machines { + for i := 0; i < machineCount; i++ { m := &v1alpha1.Machine{ TypeMeta: metav1.TypeMeta{ APIVersion: "machine.sapcloud.io", Kind: "Machine", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("machine-%d", i+1), + Name: machineNames[i], Namespace: testNamespace, OwnerReferences: []metav1.OwnerReference{ {Name: msName}, @@ -143,12 +150,12 @@ func newMachines( m.Spec = v1alpha1.MachineSpec{ProviderID: fmt.Sprintf("%s/i%d", providerIdGenerateName, i+1)} } - m.Labels["node"] = fmt.Sprintf("node-%d", i+1) - if setDeletionTimeStamp[i] { - m.ObjectMeta.DeletionTimestamp = ¤tTime - } + m.Labels["node"] = nodeNames[i] if statusTemplate != nil { m.Status = *newMachineStatus(statusTemplate) + if m.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating { + m.DeletionTimestamp = ¤tTime + } } machines[i] = m } @@ -158,9 +165,8 @@ func newMachines( func newNode( nodeName, providerId string, - addToBeDeletedTaint bool, ) *corev1.Node { - node := newNodes(1, providerId, []bool{addToBeDeletedTaint})[0] + node := newNodes(1, providerId)[0] clone := node.DeepCopy() clone.Name = nodeName clone.Spec.ProviderID = providerId @@ -170,30 +176,20 @@ func newNode( func newNodes( nodeCount int, providerIdGenerateName string, - addToBeDeletedTaint []bool, ) []*corev1.Node { - nodes := make([]*corev1.Node, nodeCount) - for i := range nodes { - var taints []corev1.Taint - if addToBeDeletedTaint[i] { - taints = append(taints, corev1.Taint{ - Key: deletetaint.ToBeDeletedTaint, - Value: testTaintValue, - Effect: corev1.TaintEffectNoSchedule, - }) - } + nodeNames := generateNames("node", nodeCount) + for i := 0; i < nodeCount; i++ { node := &corev1.Node{ TypeMeta: metav1.TypeMeta{ APIVersion: "appsv1", Kind: "Node", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("node-%d", i+1), + Name: nodeNames[i], }, Spec: corev1.NodeSpec{ ProviderID: fmt.Sprintf("%s/i%d", providerIdGenerateName, i+1), - Taints: taints, }, } @@ -287,6 +283,7 @@ func createMcmManager( discoveryOpts: cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupSpecs: nodeGroups, }, + machineDeployments: make(map[types.NamespacedName]*MachineDeployment), deploymentLister: appsControlSharedInformers.Deployments().Lister(), machineClient: fakeTypedMachineClient, machineDeploymentLister: machineDeployments.Lister(), @@ -297,7 +294,7 @@ func createMcmManager( maxRetryTimeout: 5 * time.Second, retryInterval: 1 * time.Second, } - + g.Expect(mcmManager.generateMachineDeploymentMap()).To(gomega.Succeed()) hasSyncedCachesFns := []cache.InformerSynced{ nodes.Informer().HasSynced, machines.Informer().HasSynced,