Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autoscaler considers reserved and pending replicas #7027

Merged
merged 7 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"errors"
"math"
"strconv"
"time"

Expand Down Expand Up @@ -95,6 +96,13 @@ type State struct {

// Stores for each vpod, a map of zonename to total number of vreplicas placed on all pods located in that zone currently
ZoneSpread map[types.NamespacedName]map[string]int32

// Pending tracks the number of virtual replicas that haven't been scheduled yet
// because there wasn't enough free capacity.
Pending map[types.NamespacedName]int32

// ExpectedVReplicaByVPod is the expected virtual replicas for each vpod key
ExpectedVReplicaByVPod map[types.NamespacedName]int32
Comment on lines +102 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the naming is a bit inconsistent, but we can fix that afterwards (e.g. PendingVReplica...)

}

// Free safely returns the free capacity at the given ordinal
Expand Down Expand Up @@ -190,6 +198,8 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

free := make([]int32, 0)
pending := make(map[types.NamespacedName]int32, 4)
expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods))
schedulablePods := sets.NewInt32()
last := int32(-1)

Expand Down Expand Up @@ -255,10 +265,17 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
for _, vpod := range vpods {
ps := vpod.GetPlacements()

pending[vpod.GetKey()] = pendingFromVPod(vpod)
expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas()

withPlacement[vpod.GetKey()] = make(map[string]bool)
podSpread[vpod.GetKey()] = make(map[string]int32)
nodeSpread[vpod.GetKey()] = make(map[string]int32)
Expand Down Expand Up @@ -321,13 +338,20 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread}
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}

s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))

return state, nil
}

func pendingFromVPod(vpod scheduler.VPod) int32 {
expected := vpod.GetVReplicas()
scheduled := scheduler.GetTotalVReplicas(vpod.GetPlacements())

return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)
Expand All @@ -340,13 +364,29 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last && free[ordinal] != s.capacity {
if ordinal > last {
last = ordinal
}

return free, last
}

func (s *State) TotalPending() int32 {
t := int32(0)
for _, p := range s.Pending {
t += p
}
return t
}

func (s *State) TotalExpectedVReplicas() int32 {
t := int32(0)
for _, v := range s.ExpectedVReplicaByVPod {
t += v
}
return t
}

func grow(slice []int32, ordinal int32, def int32) []int32 {
l := int32(len(slice))
diff := ordinal - l + 1
Expand Down Expand Up @@ -435,6 +475,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"`
SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"`
DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"`
Pending map[string]int32 `json:"pending"`
}

sj := S{
Expand All @@ -453,6 +494,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
SchedulerPolicy: s.SchedulerPolicy,
SchedPolicy: s.SchedPolicy,
DeschedPolicy: s.DeschedPolicy,
Pending: toJSONablePending(s.Pending),
}

return json.Marshal(sj)
Expand All @@ -465,3 +507,12 @@ func toJSONable(ps map[types.NamespacedName]map[string]int32) map[string]map[str
}
return r
}

func toJSONablePending(pending map[types.NamespacedName]int32) map[string]int32 {
r := make(map[string]int32, len(pending))
for k, v := range pending {
r[k.String()] = v
}
return r

}
84 changes: 81 additions & 3 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestStateBuilder(t *testing.T) {
name: "no vpods",
replicas: int32(0),
vpods: [][]duckv1alpha1.Placement{},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
freec: int32(0),
schedulerPolicyType: scheduler.MAXFILLUP,
},
Expand All @@ -87,6 +87,12 @@ func TestStateBuilder(t *testing.T) {
"zone-0": 1,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
},
},
freec: int32(9),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand Down Expand Up @@ -141,6 +147,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 3,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(18),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand Down Expand Up @@ -190,6 +206,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 3,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(10),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand All @@ -203,7 +229,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 0}},
{{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"},
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
Expand Down Expand Up @@ -244,6 +270,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(34),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand All @@ -257,7 +293,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 0}},
{{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"},
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
Expand Down Expand Up @@ -298,6 +334,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(28),
reserved: map[types.NamespacedName]map[string]int32{
Expand Down Expand Up @@ -361,6 +407,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(26),
reserved: map[types.NamespacedName]map[string]int32{
Expand Down Expand Up @@ -427,6 +483,16 @@ func TestStateBuilder(t *testing.T) {
"zone-1": 0,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1,
types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1,
},
},
freec: int32(28),
reserved: map[types.NamespacedName]map[string]int32{
Expand Down Expand Up @@ -462,6 +528,12 @@ func TestStateBuilder(t *testing.T) {
"zone-0": 1,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
},
},
freec: int32(9),
schedulerPolicyType: scheduler.MAXFILLUP,
Expand All @@ -488,6 +560,12 @@ func TestStateBuilder(t *testing.T) {
"zone-0": 1,
},
},
Pending: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0,
},
ExpectedVReplicaByVPod: map[types.NamespacedName]int32{
types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1,
},
},
freec: int32(9),
schedulerPolicy: &scheduler.SchedulerPolicy{
Expand Down
Loading