Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Race conditions in Targeted Deletion of machines by CA #341

Open
wants to merge 7 commits into
base: machine-controller-manager-provider
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 73 additions & 38 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ package mcm
import (
"context"
"fmt"
"github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"slices"
"strconv"
"strings"
"sync"
"time"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,15 +72,14 @@ const (
// MCMCloudProvider implements the cloud provider interface for machine-controller-manager
// Reference: https://github.com/gardener/machine-controller-manager
type mcmCloudProvider struct {
mcmManager *McmManager
machinedeployments map[types.NamespacedName]*MachineDeployment
resourceLimiter *cloudprovider.ResourceLimiter
mcmManager *McmManager
resourceLimiter *cloudprovider.ResourceLimiter
}

// BuildMcmCloudProvider builds CloudProvider implementation for machine-controller-manager.
func BuildMcmCloudProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) {
if mcmManager.discoveryOpts.StaticDiscoverySpecified() {
return buildStaticallyDiscoveringProvider(mcmManager, mcmManager.discoveryOpts.NodeGroupSpecs, resourceLimiter)
return buildStaticallyDiscoveringProvider(mcmManager, resourceLimiter)
}
return nil, fmt.Errorf("Failed to build an mcm cloud provider: Either node group specs or node group auto discovery spec must be specified")
}
Expand All @@ -96,16 +100,10 @@ func BuildMCM(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
return provider
}

func buildStaticallyDiscoveringProvider(mcmManager *McmManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) {
func buildStaticallyDiscoveringProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) {
mcm := &mcmCloudProvider{
mcmManager: mcmManager,
machinedeployments: make(map[types.NamespacedName]*MachineDeployment),
resourceLimiter: resourceLimiter,
}
for _, spec := range specs {
if err := mcm.addNodeGroup(spec); err != nil {
return nil, err
}
mcmManager: mcmManager,
resourceLimiter: resourceLimiter,
}
return mcm, nil
}
Expand All @@ -116,31 +114,14 @@ func (mcm *mcmCloudProvider) Cleanup() error {
return nil
}

// addNodeGroup adds node group defined in string spec. Format:
// minNodes:maxNodes:namespace.machineDeploymentName
func (mcm *mcmCloudProvider) addNodeGroup(spec string) error {
machinedeployment, err := buildMachineDeploymentFromSpec(spec, mcm.mcmManager)
if err != nil {
return err
}
mcm.addMachineDeployment(machinedeployment)
return nil
}

func (mcm *mcmCloudProvider) addMachineDeployment(machinedeployment *MachineDeployment) {
key := types.NamespacedName{Namespace: machinedeployment.Namespace, Name: machinedeployment.Name}
mcm.machinedeployments[key] = machinedeployment
return
}

func (mcm *mcmCloudProvider) Name() string {
return "machine-controller-manager"
}

// NodeGroups returns all node groups configured for this cloud provider.
func (mcm *mcmCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
result := make([]cloudprovider.NodeGroup, 0, len(mcm.machinedeployments))
for _, machinedeployment := range mcm.machinedeployments {
result := make([]cloudprovider.NodeGroup, 0, len(mcm.mcmManager.machineDeployments))
for _, machinedeployment := range mcm.mcmManager.machineDeployments {
if machinedeployment.maxSize == 0 {
continue
}
Expand Down Expand Up @@ -172,7 +153,7 @@ func (mcm *mcmCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.N
}

key := types.NamespacedName{Namespace: md.Namespace, Name: md.Name}
_, isManaged := mcm.machinedeployments[key]
_, isManaged := mcm.mcmManager.machineDeployments[key]
if !isManaged {
klog.V(4).Infof("Skipped node %v, it's not managed by this controller", node.Spec.ProviderID)
return nil, nil
Expand Down Expand Up @@ -293,8 +274,9 @@ type MachineDeployment struct {

mcmManager *McmManager

minSize int
maxSize int
scalingMutex sync.Mutex
minSize int
maxSize int
}

// MaxSize returns maximum size of the node group.
Expand Down Expand Up @@ -343,6 +325,8 @@ func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
if err != nil {
return err
Expand All @@ -366,6 +350,8 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
if delta >= 0 {
return fmt.Errorf("size decrease size must be negative")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
if err != nil {
return err
Expand All @@ -380,6 +366,54 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
}, "MachineDeployment", "update", machinedeployment.Name)
}

// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment
func (machineDeployment *MachineDeployment) Refresh() error {
machineDeployment.scalingMutex.Lock()
defer machineDeployment.scalingMutex.Unlock()
mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name)
Copy link

@elankath elankath Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is repeated nearly a dozen times everywhere including common error handling: machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name) . Move to a method in mcmManager called GetMachineDeploymentResource which returns a formatted error that can simply be returned, so that error message is fully consistent with all uses. we are already having methods like mcmManager.getMachinesForMachineDeployment so this matches the existing convention.

if err != nil {
return fmt.Errorf("failed to get machine deployment %s: %v", machineDeployment.Name, err)
}
// ignore the machine deployment if it is in rolling update
if !isRollingUpdateFinished(mcd) {
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name)
return nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we return an error here ? We are doing it for other cases of !isRollingUpdateFinished.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove this check. Even if a machineDeployment is under rolling update, we should allow the annotation update if needed. wdyt?

}
markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to a small function mcm.getMachinesMarkedByCAForDeletion(mcd) (machineNames sets.Set[string]) which is unit testable and can be consumed by both the mcm_cloud_provider.go and mcm_manager.go

machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
return err
}
var incorrectlyMarkedMachines []*Ref

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can omit the Ref struct and just used types.NamespacedName which is already being used anyways in this file

for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a utiltiy method isMachineFailedOrTerminating . Is that not suitable or make it suitable ?

continue
}
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, Why is this called priorityValueForCandidateMachines ? Shouldn't it be defined as const PriorityDeletionValueForCandidateMachine = 1 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rename it to PriorityValueForDeletionCandidateMachine. wdyt?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also add a comment here to explain what we are doing so next guy making a patch doesn't scratch his head.

incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace})
}
}
var updatedMarkedMachines []string

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatedMarkedMachines -> updatedMarkedMachineNames. Also please add a comment explaining that we do to ensure that annotation do not have non-existent machine names.

for machineName := range markedMachines {
Copy link

@elankath elankath Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Technically, you can create allMachineNames and use Set.Intersection here - which is cheaper than nested for loop, but its OK.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK k8s.io/apimachinery/pkg/util/sets does not have an Intersection method.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but it is not part of the apimachinery version we currently use. Anyways, we do not need sets, I have replaced them with slices.

if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool {
return mc.Name == machineName
}) {
updatedMarkedMachines = append(updatedMarkedMachines, machineName)
}
}
clone := mcd.DeepCopy()
clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This strings.Join to construct the annotation repeated elsewhere. Make a utility method called getMarkedForDeletionAnnotationValue(machineNames []string) string

ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout)
defer cancelFn()
_, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err
}
return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines)
}

// Belongs returns true if the given node belongs to the NodeGroup.
// TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list.
func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) {
Expand Down Expand Up @@ -541,9 +575,10 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach

func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment {
return &MachineDeployment{
mcmManager: mcmManager,
minSize: minSize,
maxSize: maxSize,
mcmManager: mcmManager,
minSize: minSize,
maxSize: maxSize,
scalingMutex: sync.Mutex{},
Ref: Ref{
Name: name,
Namespace: namespace,
Expand Down
Loading
Loading