Skip to content

Commit

Permalink
Convert scale-down pdb check to drainability rule
Browse files Browse the repository at this point in the history
  • Loading branch information
artemvmin committed Sep 26, 2023
1 parent fbe25e1 commit 794b434
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 89 deletions.
61 changes: 17 additions & 44 deletions cluster-autoscaler/simulator/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ limitations under the License.
package simulator

import (
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
Expand All @@ -44,7 +42,7 @@ type NodeDeleteOptions struct {
// to allow their pods deletion in scale down
MinReplicaCount int
// DrainabilityRules contain a list of checks that are used to verify whether a pod can be drained from node.
DrainabilityRules []drainability.Rule
DrainabilityRules rules.Rules
}

// NewNodeDeleteOptions returns new node delete options extracted from autoscaling options
Expand All @@ -54,7 +52,7 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions {
SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage,
MinReplicaCount: opts.MinReplicaCount,
SkipNodesWithCustomControllerPods: opts.SkipNodesWithCustomControllerPods,
DrainabilityRules: drainability.DefaultRules(),
DrainabilityRules: rules.Default(),
}
}

Expand All @@ -71,12 +69,15 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
var drainPods, drainDs []*apiv1.Pod
drainabilityRules := deleteOptions.DrainabilityRules
if drainabilityRules == nil {
drainabilityRules = drainability.DefaultRules()
drainabilityRules = rules.Default()
}
drainCtx := &drainability.DrainContext{
Pdbs: pdbs,
}
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
d := drainabilityStatus(pod, drainabilityRules)
switch d.Outcome {
status := drainabilityRules.Drainable(drainCtx, pod)
switch status.Outcome {
case drainability.UndefinedOutcome:
pods = append(pods, podInfo.Pod)
case drainability.DrainOk:
Expand All @@ -86,12 +87,17 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
drainPods = append(drainPods, pod)
}
case drainability.BlockDrain:
blockingPod = &drain.BlockingPod{pod, d.BlockingReason}
err = d.Error
// TODO(reviewer note): Can we return the pod here, even though pdb call failed?
// Alternatively, drainability would have to pass a value to indicate pod vs infrastructure error.
blockingPod = &drain.BlockingPod{
Pod: pod,
Reason: status.BlockingReason,
}
err = status.Error
return
case drainability.SkipDrain:
}
}

pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
pods,
pdbs,
Expand All @@ -106,39 +112,6 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
if err != nil {
return pods, daemonSetPods, blockingPod, err
}
if pdbBlockingPod, err := checkPdbs(pods, pdbs); err != nil {
return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err
}

return pods, daemonSetPods, nil, nil
}

func checkPdbs(pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (*drain.BlockingPod, error) {
// TODO: remove it after deprecating legacy scale down.
// RemainingPdbTracker.CanRemovePods() to replace this function.
for _, pdb := range pdbs {
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
return nil, err
}
for _, pod := range pods {
if pod.Namespace == pdb.Namespace && selector.Matches(labels.Set(pod.Labels)) {
if pdb.Status.DisruptionsAllowed < 1 {
return &drain.BlockingPod{Pod: pod, Reason: drain.NotEnoughPdb}, fmt.Errorf("not enough pod disruption budget to move %s/%s", pod.Namespace, pod.Name)
}
}
}
}
return nil, nil
}

func drainabilityStatus(pod *apiv1.Pod, dr []drainability.Rule) drainability.Status {
for _, f := range dr {
if d := f.Drainable(pod); d.Outcome != drainability.UndefinedOutcome {
return d
}
}
return drainability.Status{
Outcome: drainability.UndefinedOutcome,
}
}
22 changes: 12 additions & 10 deletions cluster-autoscaler/simulator/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -179,7 +181,7 @@ func TestGetPodsToMove(t *testing.T) {
desc string
pods []*apiv1.Pod
pdbs []*policyv1.PodDisruptionBudget
rules []drainability.Rule
rules []rules.Rule
wantPods []*apiv1.Pod
wantDs []*apiv1.Pod
wantBlocking *drain.BlockingPod
Expand Down Expand Up @@ -256,19 +258,19 @@ func TestGetPodsToMove(t *testing.T) {
{
desc: "Rule allows",
pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{alwaysDrain{}},
rules: []rules.Rule{alwaysDrain{}},
wantPods: []*apiv1.Pod{unreplicatedPod},
},
{
desc: "Second rule allows",
pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{cantDecide{}, alwaysDrain{}},
rules: []rules.Rule{cantDecide{}, alwaysDrain{}},
wantPods: []*apiv1.Pod{unreplicatedPod},
},
{
desc: "Rule blocks",
pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{neverDrain{}},
rules: []rules.Rule{neverDrain{}},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: rsPod,
Expand All @@ -278,7 +280,7 @@ func TestGetPodsToMove(t *testing.T) {
{
desc: "Second rule blocks",
pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{cantDecide{}, neverDrain{}},
rules: []rules.Rule{cantDecide{}, neverDrain{}},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: rsPod,
Expand All @@ -288,7 +290,7 @@ func TestGetPodsToMove(t *testing.T) {
{
desc: "Undecisive rule fallback to default logic: Unreplicated pod",
pods: []*apiv1.Pod{unreplicatedPod},
rules: []drainability.Rule{cantDecide{}},
rules: []rules.Rule{cantDecide{}},
wantErr: true,
wantBlocking: &drain.BlockingPod{
Pod: unreplicatedPod,
Expand All @@ -298,7 +300,7 @@ func TestGetPodsToMove(t *testing.T) {
{
desc: "Undecisive rule fallback to default logic: Replicated pod",
pods: []*apiv1.Pod{rsPod},
rules: []drainability.Rule{cantDecide{}},
rules: []rules.Rule{cantDecide{}},
wantPods: []*apiv1.Pod{rsPod},
},
}
Expand Down Expand Up @@ -326,18 +328,18 @@ func TestGetPodsToMove(t *testing.T) {

type alwaysDrain struct{}

func (a alwaysDrain) Drainable(*apiv1.Pod) drainability.Status {
func (a alwaysDrain) Drainable(*context.DrainContext, *apiv1.Pod) drainability.Status {
return drainability.NewDrainableStatus()
}

type neverDrain struct{}

func (n neverDrain) Drainable(*apiv1.Pod) drainability.Status {
func (n neverDrain) Drainable(*context.DrainContext, *apiv1.Pod) drainability.Status {
return drainability.NewBlockedStatus(drain.UnexpectedError, fmt.Errorf("nope"))
}

type cantDecide struct{}

func (c cantDecide) Drainable(*apiv1.Pod) drainability.Status {
func (c cantDecide) Drainable(*context.DrainContext, *apiv1.Pod) drainability.Status {
return drainability.NewUndefinedStatus()
}
26 changes: 26 additions & 0 deletions cluster-autoscaler/simulator/drainability/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package drainability

import (
policyv1 "k8s.io/api/policy/v1"
)

// DrainContext contains parameters for drainability rules.
type DrainContext struct {
Pdbs []*policyv1.PodDisruptionBudget
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package drainability
package mirror

import (
"k8s.io/autoscaler/cluster-autoscaler/utils/pod"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
)

// MirrorPodRule is a drainability rule on how to handle mirror pods.
type MirrorPodRule struct{}
// Rule is a drainability rule on how to handle mirror pods.
type Rule struct{}

// NewMirrorPodRule creates a new MirrorPodRule.
func NewMirrorPodRule() *MirrorPodRule {
return &MirrorPodRule{}
// NewRule creates a new MirrorPodRule.
func New() *Rule {
return &Rule{}
}

// Drainable decides what to do with mirror pods on node drain.
func (m *MirrorPodRule) Drainable(p *apiv1.Pod) Status {
if pod.IsMirrorPod(p) {
return NewSkipStatus()
func (Rule) Drainable(drainCtx *drainability.DrainContext, pod *apiv1.Pod) drainability.Status {
if pod_util.IsMirrorPod(pod) {
return drainability.NewSkipStatus()
}
return NewUndefinedStatus()
return drainability.NewUndefinedStatus()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package drainability
package mirror

import (
"testing"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/kubernetes/pkg/kubelet/types"
)

func TestMirrorPodRule(t *testing.T) {
testCases := []struct {
desc string
pod *apiv1.Pod
want Status
want drainability.Status
}{
{
desc: "non mirror pod",
Expand All @@ -38,7 +39,7 @@ func TestMirrorPodRule(t *testing.T) {
Namespace: "ns",
},
},
want: NewUndefinedStatus(),
want: drainability.NewUndefinedStatus(),
},
{
desc: "mirror pod",
Expand All @@ -51,15 +52,14 @@ func TestMirrorPodRule(t *testing.T) {
},
},
},
want: NewSkipStatus(),
want: drainability.NewSkipStatus(),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
m := NewMirrorPodRule()
got := m.Drainable(tc.pod)
got := New().Drainable(nil, tc.pod)
if tc.want != got {
t.Errorf("MirrorPodRule.Drainable(%v) = %v, want %v", tc.pod.Name, got, tc.want)
t.Errorf("Rule.Drainable(%v) = %v, want %v", tc.pod.Name, got, tc.want)
}
})
}
Expand Down
58 changes: 58 additions & 0 deletions cluster-autoscaler/simulator/drainability/rules/pdb/pdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pdb

import (
"fmt"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
)

// Rule is a drainability rule on how to handle pods with pdbs.
type Rule struct{}

// New creates a new PdbRule.
func New() *Rule {
return &Rule{}
}

// Drainable decides how to handle pods with pdbs on node drain.
func (Rule) Drainable(drainCtx *drainability.DrainContext, pod *apiv1.Pod) drainability.Status {
if drainCtx == nil {
return drainability.NewUndefinedStatus()
}

// TODO: Replace this logic with RemainingPdbTracker.CanRemovePods() after
// deprecating legacy scale down. Depending on the implementation, this may
// require adding information to the DrainContext, such as the slice of pods
// and a flag to prevent duplicate checks.
for _, pdb := range drainCtx.Pdbs {
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
return drainability.NewBlockedStatus(drain.UnexpectedError, fmt.Errorf("failed to convert label selector"))
}

if pod.Namespace == pdb.Namespace && selector.Matches(labels.Set(pod.Labels)) && pdb.Status.DisruptionsAllowed < 1 {
return drainability.NewBlockedStatus(drain.NotEnoughPdb, fmt.Errorf("not enough pod disruption budget to move %s/%s", pod.Namespace, pod.Name))
}
}
return drainability.NewUndefinedStatus()
}
Loading

0 comments on commit 794b434

Please sign in to comment.