Skip to content

Commit

Permalink
Autoscaler considers reserved and pending replicas (knative#7027) (#329)
Browse files Browse the repository at this point in the history
* Autoscaler considers reserved and pending replicas



* Format / lint fix



* Simplify MAXFILLUP logic to get the expected replicas



* Fix state tests



* Log scheduling state



* Add grace period to scale down attempts



* Format Go code



---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Aug 31, 2023
1 parent 476df5c commit cf89b7e
Show file tree
Hide file tree
Showing 6 changed files with 381 additions and 101 deletions.
55 changes: 53 additions & 2 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"errors"
"math"
"strconv"
"time"

Expand Down Expand Up @@ -95,6 +96,13 @@ type State struct {

// Stores for each vpod, a map of zonename to total number of vreplicas placed on all pods located in that zone currently
ZoneSpread map[types.NamespacedName]map[string]int32

// Pending tracks the number of virtual replicas that haven't been scheduled yet
// because there wasn't enough free capacity.
Pending map[types.NamespacedName]int32

// ExpectedVReplicaByVPod is the expected virtual replicas for each vpod key
ExpectedVReplicaByVPod map[types.NamespacedName]int32
}

// Free safely returns the free capacity at the given ordinal
Expand Down Expand Up @@ -190,6 +198,8 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

free := make([]int32, 0)
pending := make(map[types.NamespacedName]int32, 4)
expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods))
schedulablePods := sets.NewInt32()
last := int32(-1)

Expand Down Expand Up @@ -255,10 +265,17 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
for _, vpod := range vpods {
ps := vpod.GetPlacements()

pending[vpod.GetKey()] = pendingFromVPod(vpod)
expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas()

withPlacement[vpod.GetKey()] = make(map[string]bool)
podSpread[vpod.GetKey()] = make(map[string]int32)
nodeSpread[vpod.GetKey()] = make(map[string]int32)
Expand Down Expand Up @@ -321,13 +338,20 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread}
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}

s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))

return state, nil
}

func pendingFromVPod(vpod scheduler.VPod) int32 {
expected := vpod.GetVReplicas()
scheduled := scheduler.GetTotalVReplicas(vpod.GetPlacements())

return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)
Expand All @@ -340,13 +364,29 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last && free[ordinal] != s.capacity {
if ordinal > last {
last = ordinal
}

return free, last
}

func (s *State) TotalPending() int32 {
t := int32(0)
for _, p := range s.Pending {
t += p
}
return t
}

func (s *State) TotalExpectedVReplicas() int32 {
t := int32(0)
for _, v := range s.ExpectedVReplicaByVPod {
t += v
}
return t
}

func grow(slice []int32, ordinal int32, def int32) []int32 {
l := int32(len(slice))
diff := ordinal - l + 1
Expand Down Expand Up @@ -435,6 +475,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"`
SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"`
DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"`
Pending map[string]int32 `json:"pending"`
}

sj := S{
Expand All @@ -453,6 +494,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
SchedulerPolicy: s.SchedulerPolicy,
SchedPolicy: s.SchedPolicy,
DeschedPolicy: s.DeschedPolicy,
Pending: toJSONablePending(s.Pending),
}

return json.Marshal(sj)
Expand All @@ -465,3 +507,12 @@ func toJSONable(ps map[types.NamespacedName]map[string]int32) map[string]map[str
}
return r
}

func toJSONablePending(pending map[types.NamespacedName]int32) map[string]int32 {
r := make(map[string]int32, len(pending))
for k, v := range pending {
r[k.String()] = v
}
return r

}
84 changes: 81 additions & 3 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestStateBuilder(t *testing.T) {
name: "no vpods",
replicas: int32(0),
vpods: [][]duckv1alpha1.Placement{},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
freec: int32(0),
schedulerPolicyType: scheduler.MAXFILLUP,
},
Expand All @@ -87,6 +87,12 @@ func TestStateBuilder(t *testing.T) {
"zone-0": 1,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
},
},
freec: int32(9),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand Down Expand Up @@ -141,6 +147,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 3,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(18),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand Down Expand Up @@ -190,6 +206,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 3,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(10),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand All @@ -203,7 +229,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 0}},
{{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"},
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
Expand Down Expand Up @@ -244,6 +270,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(34),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand All @@ -257,7 +293,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 0}},
{{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"},
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
Expand Down Expand Up @@ -298,6 +334,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(28),
reserved: map[types.NamespacedName]map[string]int32{
Expand Down Expand Up @@ -361,6 +407,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(26),
reserved: map[types.NamespacedName]map[string]int32{
Expand Down Expand Up @@ -427,6 +483,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(28),
reserved: map[types.NamespacedName]map[string]int32{
Expand Down Expand Up @@ -462,6 +528,12 @@ func TestStateBuilder(t *testing.T) {
"zone-0": 1,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
},
},
freec: int32(9),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand All @@ -488,6 +560,12 @@ func TestStateBuilder(t *testing.T) {
"zone-0": 1,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
},
},
freec: int32(9),
schedulerPolicy: &scheduler.SchedulerPolicy{
Expand Down
Loading

0 comments on commit cf89b7e

Please sign in to comment.