diff --git a/pkg/scheduler/state/state.go b/pkg/scheduler/state/state.go index fcc758cd394..2d5460cf801 100644 --- a/pkg/scheduler/state/state.go +++ b/pkg/scheduler/state/state.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "errors" + "math" "strconv" "time" @@ -95,6 +96,13 @@ type State struct { // Stores for each vpod, a map of zonename to total number of vreplicas placed on all pods located in that zone currently ZoneSpread map[types.NamespacedName]map[string]int32 + + // Pending tracks the number of virtual replicas that haven't been scheduled yet + // because there wasn't enough free capacity. + Pending map[types.NamespacedName]int32 + + // ExpectedVReplicaByVPod is the expected virtual replicas for each vpod key + ExpectedVReplicaByVPod map[types.NamespacedName]int32 } // Free safely returns the free capacity at the given ordinal @@ -190,6 +198,8 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } free := make([]int32, 0) + pending := make(map[types.NamespacedName]int32, 4) + expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods)) schedulablePods := sets.NewInt32() last := int32(-1) @@ -255,10 +265,17 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } + for _, p := range schedulablePods.List() { + free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + } + // Getting current state from existing placements for all vpods for _, vpod := range vpods { ps := vpod.GetPlacements() + pending[vpod.GetKey()] = pendingFromVPod(vpod) + expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas() + withPlacement[vpod.GetKey()] = make(map[string]bool) podSpread[vpod.GetKey()] = make(map[string]int32) nodeSpread[vpod.GetKey()] = make(map[string]int32) @@ -321,13 +338,20 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)), SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, - PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread} + PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) return state, nil } +func pendingFromVPod(vpod scheduler.VPod) int32 { + expected := vpod.GetVReplicas() + scheduled := scheduler.GetTotalVReplicas(vpod.GetPlacements()) + + return int32(math.Max(float64(0), float64(expected-scheduled))) +} + func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) @@ -340,13 +364,29 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } - if ordinal > last && free[ordinal] != s.capacity { + if ordinal > last { last = ordinal } return free, last } +func (s *State) TotalPending() int32 { + t := int32(0) + for _, p := range s.Pending { + t += p + } + return t +} + +func (s *State) TotalExpectedVReplicas() int32 { + t := int32(0) + for _, v := range s.ExpectedVReplicaByVPod { + t += v + } + return t +} + func grow(slice []int32, ordinal int32, def int32) []int32 { l := int32(len(slice)) diff := ordinal - l + 1 @@ -435,6 +475,7 @@ func (s *State) MarshalJSON() ([]byte, error) { SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` DeschedPolicy *scheduler.SchedulerPolicy `json:"deschedPolicy"` + Pending map[string]int32 `json:"pending"` } sj := S{ @@ -453,6 +494,7 @@ func (s *State) MarshalJSON() ([]byte, error) { SchedulerPolicy: s.SchedulerPolicy, SchedPolicy: s.SchedPolicy, DeschedPolicy: s.DeschedPolicy, + Pending: toJSONablePending(s.Pending), } return json.Marshal(sj) @@ -465,3 +507,12 @@ func toJSONable(ps map[types.NamespacedName]map[string]int32) map[string]map[str } return r } + +func toJSONablePending(pending map[types.NamespacedName]int32) map[string]int32 { + r := make(map[string]int32, len(pending)) + for k, v := range pending { + r[k.String()] = v + } + return r + +} diff --git a/pkg/scheduler/state/state_test.go b/pkg/scheduler/state/state_test.go index dce2bb5ef59..60c72b1db1f 100644 --- a/pkg/scheduler/state/state_test.go +++ b/pkg/scheduler/state/state_test.go @@ -62,7 +62,7 @@ func TestStateBuilder(t *testing.T) { name: "no vpods", replicas: int32(0), vpods: [][]duckv1alpha1.Placement{}, - expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName}, + expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}}, freec: int32(0), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -87,6 +87,12 @@ func TestStateBuilder(t *testing.T) { "zone-0": 1, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + }, }, freec: int32(9), schedulerPolicyType: scheduler.MAXFILLUP, @@ -141,6 +147,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 3, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(18), schedulerPolicyType: scheduler.MAXFILLUP, @@ -190,6 +206,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 3, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 0, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(10), schedulerPolicyType: scheduler.MAXFILLUP, @@ -203,7 +229,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"}, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { @@ -244,6 +270,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(34), schedulerPolicyType: scheduler.MAXFILLUP, @@ -257,7 +293,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 2, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(3), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, NumNodes: 4, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName, NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2", "node-3": "zone-0"}, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { @@ -298,6 +334,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(28), reserved: map[types.NamespacedName]map[string]int32{ @@ -361,6 +407,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(26), reserved: map[types.NamespacedName]map[string]int32{ @@ -427,6 +483,16 @@ func TestStateBuilder(t *testing.T) { "zone-1": 0, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + types.NamespacedName{Name: "vpod-name-1", Namespace: "vpod-ns-1"}: 1, + types.NamespacedName{Name: "vpod-name-2", Namespace: "vpod-ns-2"}: 1, + }, }, freec: int32(28), reserved: map[types.NamespacedName]map[string]int32{ @@ -462,6 +528,12 @@ func TestStateBuilder(t *testing.T) { "zone-0": 1, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + }, }, freec: int32(9), schedulerPolicyType: scheduler.MAXFILLUP, @@ -488,6 +560,12 @@ func TestStateBuilder(t *testing.T) { "zone-0": 1, }, }, + Pending: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 0, + }, + ExpectedVReplicaByVPod: map[types.NamespacedName]int32{ + types.NamespacedName{Name: "vpod-name-0", Namespace: "vpod-ns-0"}: 1, + }, }, freec: int32(9), schedulerPolicy: &scheduler.SchedulerPolicy{ diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index 5641502e080..53b184e90c2 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -52,9 +52,8 @@ type Autoscaler interface { // Start runs the autoscaler until cancelled. Start(ctx context.Context) - // Autoscale is used to immediately trigger the autoscaler with the hint - // that pending number of vreplicas couldn't be scheduled. - Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) + // Autoscale is used to immediately trigger the autoscaler. + Autoscale(ctx context.Context) } type autoscaler struct { @@ -63,7 +62,7 @@ type autoscaler struct { vpodLister scheduler.VPodLister logger *zap.SugaredLogger stateAccessor st.StateAccessor - trigger chan int32 + trigger chan struct{} evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. @@ -77,6 +76,11 @@ type autoscaler struct { // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a // bucket where we've been promoted. isLeader atomic.Bool + + // getReserved returns reserved replicas. + getReserved GetReserved + + lastCompactAttempt time.Time } var ( @@ -108,53 +112,65 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, evictor: cfg.Evictor, - trigger: make(chan int32, 1), + trigger: make(chan struct{}, 1), capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, lock: new(sync.Mutex), isLeader: atomic.Bool{}, + getReserved: cfg.getReserved, + // Anything that is less than now() - refreshPeriod, so that we will try to compact + // as soon as we start. + lastCompactAttempt: time.Now(). + Add(-cfg.RefreshPeriod). + Add(-time.Minute), } } func (a *autoscaler) Start(ctx context.Context) { attemptScaleDown := false - pending := int32(0) for { select { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): attemptScaleDown = true - case pending = <-a.trigger: + case <-a.trigger: attemptScaleDown = false } // Retry a few times, just so that we don't have to wait for the next beat when // a transient error occurs - a.syncAutoscale(ctx, attemptScaleDown, pending) - pending = int32(0) + a.syncAutoscale(ctx, attemptScaleDown) } } -func (a *autoscaler) Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) { - a.syncAutoscale(ctx, attemptScaleDown, pending) +func (a *autoscaler) Autoscale(ctx context.Context) { + // We trigger the autoscaler asynchronously by using the channel so that the scale down refresh + // period is reset. + a.trigger <- struct{}{} } -func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool, pending int32) { +func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) error { a.lock.Lock() defer a.lock.Unlock() + var lastErr error wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) { - err := a.doautoscale(ctx, attemptScaleDown, pending) + err := a.doautoscale(ctx, attemptScaleDown) + if err != nil { + logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err)) + } + lastErr = err return err == nil, nil }) + return lastErr } -func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pending int32) error { +func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error { if !a.isLeader.Load() { return nil } - state, err := a.stateAccessor.State(nil) + state, err := a.stateAccessor.State(a.getReserved()) if err != nil { a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err @@ -168,9 +184,8 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen } a.logger.Debugw("checking adapter capacity", - zap.Int32("pending", pending), zap.Int32("replicas", scale.Spec.Replicas), - zap.Int32("last ordinal", state.LastOrdinal)) + zap.Any("state", state)) var scaleUpFactor, newreplicas, minNumPods int32 scaleUpFactor = 1 // Non-HA scaling @@ -183,21 +198,26 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen newreplicas = state.LastOrdinal + 1 // Ideal number - // Take into account pending replicas and pods that are already filled (for even pod spread) - if pending > 0 { - // Make sure to allocate enough pods for holding all pending replicas. - if state.SchedPolicy != nil && contains(state.SchedPolicy.Predicates, nil, st.EvenPodSpread) && len(state.FreeCap) > 0 { //HA scaling across pods - leastNonZeroCapacity := a.minNonZeroInt(state.FreeCap) - minNumPods = int32(math.Ceil(float64(pending) / float64(leastNonZeroCapacity))) - } else { - minNumPods = int32(math.Ceil(float64(pending) / float64(a.capacity))) + if state.SchedulerPolicy == scheduler.MAXFILLUP { + newreplicas = int32(math.Ceil(float64(state.TotalExpectedVReplicas()) / float64(state.Capacity))) + } else { + // Take into account pending replicas and pods that are already filled (for even pod spread) + pending := state.TotalPending() + if pending > 0 { + // Make sure to allocate enough pods for holding all pending replicas. + if state.SchedPolicy != nil && contains(state.SchedPolicy.Predicates, nil, st.EvenPodSpread) && len(state.FreeCap) > 0 { //HA scaling across pods + leastNonZeroCapacity := a.minNonZeroInt(state.FreeCap) + minNumPods = int32(math.Ceil(float64(pending) / float64(leastNonZeroCapacity))) + } else { + minNumPods = int32(math.Ceil(float64(pending) / float64(a.capacity))) + } + newreplicas += int32(math.Ceil(float64(minNumPods)/float64(scaleUpFactor)) * float64(scaleUpFactor)) } - newreplicas += int32(math.Ceil(float64(minNumPods)/float64(scaleUpFactor)) * float64(scaleUpFactor)) - } - // Make sure to never scale down past the last ordinal - if newreplicas <= state.LastOrdinal { - newreplicas = state.LastOrdinal + scaleUpFactor + if newreplicas <= state.LastOrdinal { + // Make sure to never scale down past the last ordinal + newreplicas = state.LastOrdinal + scaleUpFactor + } } // Only scale down if permitted @@ -223,6 +243,24 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen } func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { + + // This avoids a too aggressive scale down by adding a "grace period" based on the refresh + // period + nextAttempt := a.lastCompactAttempt.Add(a.refreshPeriod) + if time.Now().Before(nextAttempt) { + a.logger.Debugw("Compact was retried before refresh period", + zap.Time("lastCompactAttempt", a.lastCompactAttempt), + zap.Time("nextAttempt", nextAttempt), + zap.String("refreshPeriod", a.refreshPeriod.String()), + ) + return + } + + a.logger.Debugw("Trying to compact and scale down", + zap.Int32("scaleUpFactor", scaleUpFactor), + zap.Any("state", s), + ) + // when there is only one pod there is nothing to move or number of pods is just enough! if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { return @@ -235,6 +273,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) if freeCapacity >= usedInLastPod { + a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) @@ -254,6 +293,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { if (freeCapacity >= usedInLastXPods) && //remaining pods can hold all vreps from evicted pods (s.Replicas-scaleUpFactor >= scaleUpFactor) { //remaining # of pods is enough for HA scaling + a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 976a2e1cf8a..48c56379e28 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -53,12 +53,12 @@ func TestAutoscaler(t *testing.T) { name string replicas int32 vpods []scheduler.VPod - pendings int32 scaleDown bool wantReplicas int32 schedulerPolicyType scheduler.SchedulerPolicyType schedulerPolicy *scheduler.SchedulerPolicy deschedulerPolicy *scheduler.SchedulerPolicy + reserved map[types.NamespacedName]map[string]int32 }{ { name: "no replicas, no placements, no pending", @@ -66,7 +66,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 0, nil), }, - pendings: int32(0), wantReplicas: int32(0), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -76,7 +75,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 5, nil), }, - pendings: int32(5), wantReplicas: int32(1), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -88,7 +86,6 @@ func TestAutoscaler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(0), wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -96,23 +93,21 @@ func TestAutoscaler(t *testing.T) { name: "no replicas, with placements, with pending, enough capacity", replicas: int32(0), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), - wantReplicas: int32(3), + wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, { name: "no replicas, with placements, with pending, not enough capacity", replicas: int32(0), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 23, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(8), wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -122,7 +117,14 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 0, nil), }, - pendings: int32(0), + scaleDown: true, + wantReplicas: int32(0), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, no placements, no pending, scale down (no vpods)", + replicas: int32(3), + vpods: []scheduler.VPod{}, scaleDown: true, wantReplicas: int32(0), schedulerPolicyType: scheduler.MAXFILLUP, @@ -133,7 +135,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 5, nil), }, - pendings: int32(5), scaleDown: true, wantReplicas: int32(1), schedulerPolicyType: scheduler.MAXFILLUP, @@ -144,7 +145,6 @@ func TestAutoscaler(t *testing.T) { vpods: []scheduler.VPod{ tscheduler.NewVPod(testNs, "vpod-1", 5, nil), }, - pendings: int32(5), scaleDown: false, wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, @@ -153,10 +153,18 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, no placements, with pending, scale up", replicas: int32(3), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 5, nil), + tscheduler.NewVPod(testNs, "vpod-1", 45, nil), }, - pendings: int32(40), - wantReplicas: int32(4), + wantReplicas: int32(5), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, no placements, with pending, no change", + replicas: int32(3), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 25, nil), + }, + wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, }, { @@ -167,10 +175,92 @@ func TestAutoscaler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(0), wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, + { + name: "with replicas, with placements, with reserved", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 12, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(2), + schedulerPolicyType: scheduler.MAXFILLUP, + reserved: map[types.NamespacedName]map[string]int32{ + {Namespace: testNs, Name: "vpod-1"}: { + "statefulset-name-0": 8, + }, + }, + }, + { + name: "with replicas, with placements, with reserved (scale up)", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 22, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(2)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(3), + schedulerPolicyType: scheduler.MAXFILLUP, + reserved: map[types.NamespacedName]map[string]int32{ + {Namespace: testNs, Name: "vpod-1"}: { + "statefulset-name-0": 9, + }, + }, + }, + { + name: "with replicas, with placements, with pending (scale up)", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(3), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, with placements, with pending (scale up)", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + tscheduler.NewVPod(testNs, "vpod-2", 19, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(4), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, with placements, with pending (scale up), 1 over capacity", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + tscheduler.NewVPod(testNs, "vpod-2", 20, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(5), + schedulerPolicyType: scheduler.MAXFILLUP, + }, + { + name: "with replicas, with placements, with pending, attempt scale down", + replicas: int32(3), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 21, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(5)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + }, + wantReplicas: int32(3), + scaleDown: true, + schedulerPolicyType: scheduler.MAXFILLUP, + }, { name: "with replicas, with placements, no pending, scale down", replicas: int32(5), @@ -179,7 +269,6 @@ func TestAutoscaler(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(0), scaleDown: true, wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, @@ -188,23 +277,21 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), - wantReplicas: int32(3), + wantReplicas: int32(2), schedulerPolicyType: scheduler.MAXFILLUP, }, { name: "with replicas, with placements, with pending, not enough capacity", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 23, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(8), wantReplicas: int32(3), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -212,14 +299,13 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, no pending, round up capacity", replicas: int32(5), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 20, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}, {PodName: "statefulset-name-2", VReplicas: int32(1)}, {PodName: "statefulset-name-3", VReplicas: int32(1)}, {PodName: "statefulset-name-4", VReplicas: int32(1)}}), }, - pendings: int32(0), wantReplicas: int32(5), schedulerPolicyType: scheduler.MAXFILLUP, }, @@ -227,11 +313,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Predicates and Zone Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(5), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -247,11 +332,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Predicates and Node Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(8), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -267,11 +351,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Pod Predicates and Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(4), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -287,11 +370,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Pod Predicates and Zone Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(5), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -308,11 +390,10 @@ func TestAutoscaler(t *testing.T) { name: "with replicas, with placements, with pending, enough capacity, with Pod Predicates and Node Priorities", replicas: int32(2), vpods: []scheduler.VPod{ - tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{ + tscheduler.NewVPod(testNs, "vpod-1", 18, []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(7)}}), }, - pendings: int32(3), wantReplicas: int32(8), schedulerPolicy: &scheduler.SchedulerPolicy{ Predicates: []scheduler.PredicatePolicy{ @@ -387,6 +468,9 @@ func TestAutoscaler(t *testing.T) { Evictor: noopEvictor, RefreshPeriod: 10 * time.Second, PodCapacity: 10, + getReserved: func() map[types.NamespacedName]map[string]int32 { + return tc.reserved + }, } autoscaler := newAutoscaler(ctx, cfg, stateAccessor) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) @@ -395,7 +479,7 @@ func TestAutoscaler(t *testing.T) { vpodClient.Append(vpod) } - err = autoscaler.doautoscale(ctx, tc.scaleDown, tc.pendings) + err = autoscaler.syncAutoscale(ctx, tc.scaleDown) if err != nil { t.Fatal("unexpected error", err) } @@ -444,6 +528,9 @@ func TestAutoscalerScaleDownToZero(t *testing.T) { Evictor: noopEvictor, RefreshPeriod: 2 * time.Second, PodCapacity: 10, + getReserved: func() map[types.NamespacedName]map[string]int32 { + return nil + }, } autoscaler := newAutoscaler(ctx, cfg, stateAccessor) _ = autoscaler.Promote(reconciler.UniversalBucket(), nil) diff --git a/pkg/scheduler/statefulset/scheduler.go b/pkg/scheduler/statefulset/scheduler.go index da8db64f623..4f5890f9222 100644 --- a/pkg/scheduler/statefulset/scheduler.go +++ b/pkg/scheduler/statefulset/scheduler.go @@ -58,6 +58,8 @@ import ( _ "knative.dev/eventing/pkg/scheduler/plugins/kafka/nomaxresourcecount" ) +type GetReserved func() map[types.NamespacedName]map[string]int32 + type Config struct { StatefulSetNamespace string `json:"statefulSetNamespace"` StatefulSetName string `json:"statefulSetName"` @@ -75,6 +77,9 @@ type Config struct { VPodLister scheduler.VPodLister `json:"-"` NodeLister corev1listers.NodeLister `json:"-"` + + // getReserved returns reserved replicas + getReserved GetReserved } func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { @@ -83,11 +88,26 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace) stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister) + + var getReserved GetReserved + cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { + return getReserved() + } + autoscaler := newAutoscaler(ctx, cfg, stateAccessor) - go autoscaler.Start(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + wg.Wait() + autoscaler.Start(ctx) + }() + + s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister) + getReserved = s.Reserved + wg.Done() - return newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister), nil + return s, nil } // NewScheduler creates a new scheduler with pod autoscaling enabled. @@ -120,6 +140,16 @@ func NewScheduler(ctx context.Context, return s } +type Pending map[types.NamespacedName]int32 + +func (p Pending) Total() int32 { + t := int32(0) + for _, vr := range p { + t += vr + } + return t +} + // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { ctx context.Context @@ -136,14 +166,10 @@ type StatefulSetScheduler struct { // replicas is the (cached) number of statefulset replicas. replicas int32 - // pending tracks the number of virtual replicas that haven't been scheduled yet - // because there wasn't enough free capacity. - // The autoscaler uses - pending map[types.NamespacedName]int32 - // reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been // committed yet (ie. not appearing in vpodLister) - reserved map[types.NamespacedName]map[string]int32 + reserved map[types.NamespacedName]map[string]int32 + reservedMu sync.Mutex } var ( @@ -180,7 +206,6 @@ func newStatefulSetScheduler(ctx context.Context, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), podLister: podlister, vpodLister: cfg.VPodLister, - pending: make(map[types.NamespacedName]int32), lock: new(sync.Mutex), stateAccessor: stateAccessor, reserved: make(map[types.NamespacedName]map[string]int32), @@ -200,6 +225,8 @@ func newStatefulSetScheduler(ctx context.Context, func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { s.lock.Lock() defer s.lock.Unlock() + s.reservedMu.Lock() + defer s.reservedMu.Unlock() vpods, err := s.vpodLister() if err != nil { @@ -227,8 +254,6 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { logger := s.logger.With("key", vpod.GetKey()) - logger.Debugw("scheduling", zap.Any("pending", toJSONable(s.pending))) - // Get the current placements state // Quite an expensive operation but safe and simple. state, err := s.stateAccessor.State(s.reserved) @@ -237,6 +262,8 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 return nil, err } + logger.Debugw("scheduling", zap.Any("state", state)) + existingPlacements := vpod.GetPlacements() var left int32 @@ -260,7 +287,6 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 tr := scheduler.GetTotalVReplicas(placements) if tr == vpod.GetVReplicas() { logger.Debug("scheduling succeeded (already scheduled)") - delete(s.pending, vpod.GetKey()) // Fully placed. Nothing to do return placements, nil @@ -308,17 +334,14 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Give time for the autoscaler to do its job logger.Info("not enough pod replicas to schedule. Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left)) - s.pending[vpod.GetKey()] = left - // Trigger the autoscaler if s.autoscaler != nil { - s.autoscaler.Autoscale(s.ctx, false, s.pendingVReplicas()) + s.autoscaler.Autoscale(s.ctx) } if state.SchedPolicy != nil { logger.Info("reverting to previous placements") s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed - delete(s.pending, vpod.GetKey()) // rebalancing doesn't care about pending since all vreps will be re-placed return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job } @@ -326,7 +349,6 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } logger.Infow("scheduling successful", zap.Any("placement", placements)) - delete(s.pending, vpod.GetKey()) return placements, nil } @@ -735,16 +757,6 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placeme return newPlacements, diff } -// pendingReplicas returns the total number of vreplicas -// that haven't been scheduled yet -func (s *StatefulSetScheduler) pendingVReplicas() int32 { - t := int32(0) - for _, v := range s.pending { - t += v - } - return t -} - func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { statefulset, ok := obj.(*appsv1.StatefulSet) if !ok { @@ -800,3 +812,18 @@ func (s *StatefulSetScheduler) notEnoughPodReplicas(left int32) error { controller.NewRequeueAfter(5*time.Second), ) } + +func (s *StatefulSetScheduler) Reserved() map[types.NamespacedName]map[string]int32 { + s.reservedMu.Lock() + defer s.reservedMu.Unlock() + + r := make(map[types.NamespacedName]map[string]int32, len(s.reserved)) + for k1, v1 := range s.reserved { + r[k1] = make(map[string]int32, len(v1)) + for k2, v2 := range v1 { + r[k1][k2] = v2 + } + } + + return r +} diff --git a/pkg/scheduler/statefulset/scheduler_test.go b/pkg/scheduler/statefulset/scheduler_test.go index f9ac3a25669..456c9d4e56c 100644 --- a/pkg/scheduler/statefulset/scheduler_test.go +++ b/pkg/scheduler/statefulset/scheduler_test.go @@ -784,9 +784,6 @@ func TestStatefulsetScheduler(t *testing.T) { VPodLister: vpodClient.List, } s := newStatefulSetScheduler(ctx, cfg, sa, nil, lsp.GetPodLister().Pods(testNs)) - if tc.pending != nil { - s.pending = tc.pending - } // Give some time for the informer to notify the scheduler and set the number of replicas err = wait.PollImmediate(200*time.Millisecond, time.Second, func() (bool, error) { @@ -907,7 +904,7 @@ type fakeAutoscaler struct { func (f *fakeAutoscaler) Start(ctx context.Context) { } -func (f *fakeAutoscaler) Autoscale(ctx context.Context, attemptScaleDown bool, pending int32) { +func (f *fakeAutoscaler) Autoscale(ctx context.Context) { } func newFakeAutoscaler() *fakeAutoscaler {