Skip to content

Commit

Permalink
Simplify MAXFILLUP logic to get the expected replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Aug 31, 2023
1 parent 37bb493 commit 3ddaefc
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 118 deletions.
55 changes: 53 additions & 2 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"errors"
"math"
"strconv"
"time"

Expand Down Expand Up @@ -95,6 +96,13 @@ type State struct {

// Stores for each vpod, a map of zonename to total number of vreplicas placed on all pods located in that zone currently
ZoneSpread map[types.NamespacedName]map[string]int32

// Pending tracks the number of virtual replicas that haven't been scheduled yet
// because there wasn't enough free capacity.
Pending map[types.NamespacedName]int32

// ExpectedVReplicaByVPod is the expected virtual replicas for each vpod key
ExpectedVReplicaByVPod map[types.NamespacedName]int32
}

// Free safely returns the free capacity at the given ordinal
Expand Down Expand Up @@ -190,6 +198,8 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

free := make([]int32, 0)
pending := make(map[types.NamespacedName]int32, 4)
expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods))
schedulablePods := sets.NewInt32()
last := int32(-1)

Expand Down Expand Up @@ -255,10 +265,17 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
for _, vpod := range vpods {
ps := vpod.GetPlacements()

pending[vpod.GetKey()] = pendingFromVPod(vpod)
expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas()

withPlacement[vpod.GetKey()] = make(map[string]bool)
podSpread[vpod.GetKey()] = make(map[string]int32)
nodeSpread[vpod.GetKey()] = make(map[string]int32)
Expand Down Expand Up @@ -321,13 +338,20 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread}
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}

s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))

return state, nil
}

func pendingFromVPod(vpod scheduler.VPod) int32 {
expected := vpod.GetVReplicas()
scheduled := scheduler.GetTotalVReplicas(vpod.GetPlacements())

return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)
Expand All @@ -340,13 +364,29 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last && free[ordinal] != s.capacity {
if ordinal > last {
last = ordinal
}

return free, last
}

func (s *State) TotalPending() int32 {
t := int32(0)
for _, p := range s.Pending {
t += p
}
return t
}

func (s *State) TotalExpectedVReplicas() int32 {
t := int32(0)
for _, v := range s.ExpectedVReplicaByVPod {
t += v
}
return t
}

func grow(slice []int32, ordinal int32, def int32) []int32 {
l := int32(len(slice))
diff := ordinal - l + 1
Expand Down Expand Up @@ -435,6 +475,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"`
SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"`
DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"`
Pending map[string]int32 `json:"pending"`
}

sj := S{
Expand All @@ -453,6 +494,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
SchedulerPolicy: s.SchedulerPolicy,
SchedPolicy: s.SchedPolicy,
DeschedPolicy: s.DeschedPolicy,
Pending: toJSONablePending(s.Pending),
}

return json.Marshal(sj)
Expand All @@ -465,3 +507,12 @@ func toJSONable(ps map[types.NamespacedName]map[string]int32) map[string]map[str
}
return r
}

func toJSONablePending(pending map[types.NamespacedName]int32) map[string]int32 {
r := make(map[string]int32, len(pending))
for k, v := range pending {
r[k.String()] = v
}
return r

}
45 changes: 24 additions & 21 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ type autoscaler struct {

// getReserved returns reserved replicas.
getReserved GetReserved
// getPending returns pending replicas.
getPending GetPending
}

var (
Expand Down Expand Up @@ -118,7 +116,6 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
getReserved: cfg.getReserved,
getPending: cfg.getPending,
}
}

Expand All @@ -141,7 +138,9 @@ func (a *autoscaler) Start(ctx context.Context) {
}

func (a *autoscaler) Autoscale(ctx context.Context) {
a.syncAutoscale(ctx, false)
// We trigger the autoscaler asynchronously by using the channel so that the scale down refresh
// period is reset.
a.trigger <- struct{}{}
}

func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) error {
Expand All @@ -150,7 +149,7 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) e

var lastErr error
wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) {
err := a.doautoscale(ctx, attemptScaleDown, a.getPending().Total())
err := a.doautoscale(ctx, attemptScaleDown)
if err != nil {
logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err))
}
Expand All @@ -160,7 +159,7 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) e
return lastErr
}

func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pending int32) error {
func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error {
if !a.isLeader.Load() {
return nil
}
Expand All @@ -178,9 +177,8 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen
}

a.logger.Debugw("checking adapter capacity",
zap.Int32("pending", pending),
zap.Int32("replicas", scale.Spec.Replicas),
zap.Int32("last ordinal", state.LastOrdinal))
zap.Any("state", state))

var scaleUpFactor, newreplicas, minNumPods int32
scaleUpFactor = 1 // Non-HA scaling
Expand All @@ -193,21 +191,26 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen

newreplicas = state.LastOrdinal + 1 // Ideal number

// Take into account pending replicas and pods that are already filled (for even pod spread)
if pending > 0 {
// Make sure to allocate enough pods for holding all pending replicas.
if state.SchedPolicy != nil && contains(state.SchedPolicy.Predicates, nil, st.EvenPodSpread) && len(state.FreeCap) > 0 { //HA scaling across pods
leastNonZeroCapacity := a.minNonZeroInt(state.FreeCap)
minNumPods = int32(math.Ceil(float64(pending) / float64(leastNonZeroCapacity)))
} else {
minNumPods = int32(math.Ceil(float64(pending) / float64(a.capacity)))
if state.SchedulerPolicy == scheduler.MAXFILLUP {
newreplicas = int32(math.Ceil(float64(state.TotalExpectedVReplicas()) / float64(state.Capacity)))
} else {
// Take into account pending replicas and pods that are already filled (for even pod spread)
pending := state.TotalPending()
if pending > 0 {
// Make sure to allocate enough pods for holding all pending replicas.
if state.SchedPolicy != nil && contains(state.SchedPolicy.Predicates, nil, st.EvenPodSpread) && len(state.FreeCap) > 0 { //HA scaling across pods
leastNonZeroCapacity := a.minNonZeroInt(state.FreeCap)
minNumPods = int32(math.Ceil(float64(pending) / float64(leastNonZeroCapacity)))
} else {
minNumPods = int32(math.Ceil(float64(pending) / float64(a.capacity)))
}
newreplicas += int32(math.Ceil(float64(minNumPods)/float64(scaleUpFactor)) * float64(scaleUpFactor))
}
newreplicas += int32(math.Ceil(float64(minNumPods)/float64(scaleUpFactor)) * float64(scaleUpFactor))
}

// Make sure to never scale down past the last ordinal
if newreplicas <= state.LastOrdinal {
newreplicas = state.LastOrdinal + scaleUpFactor
if newreplicas <= state.LastOrdinal {
// Make sure to never scale down past the last ordinal
newreplicas = state.LastOrdinal + scaleUpFactor
}
}

// Only scale down if permitted
Expand Down
Loading

0 comments on commit 3ddaefc

Please sign in to comment.