From d47a8a602d5bc9e07aaf361290ce24b2760203ad Mon Sep 17 00:00:00 2001 From: James Jesudason Date: Thu, 3 Oct 2024 19:55:27 +0100 Subject: [PATCH] Add warmup counter to EWMA rate --- pkg/util/limiter/utilization.go | 14 ++++---------- pkg/util/math/rate.go | 15 +++++++++++++++ pkg/util/math/rate_test.go | 21 ++++++++++++++++----- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/pkg/util/limiter/utilization.go b/pkg/util/limiter/utilization.go index 98de134a306..99cf74080bb 100644 --- a/pkg/util/limiter/utilization.go +++ b/pkg/util/limiter/utilization.go @@ -73,8 +73,6 @@ type UtilizationBasedLimiter struct { cpuLimit float64 // Last CPU utilization time counter. lastCPUTime float64 - // The time of the first CPU update. - firstCPUUpdate time.Time // The time of the last CPU update. lastCPUUpdate time.Time cpuMovingAvg *math.EwmaRate @@ -186,14 +184,10 @@ func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil f } // The CPU utilization moving average requires a warmup period before getting - // stable results. In this implementation we use a warmup period equal to the - // sliding window. During the warmup, the reported CPU utilization will be 0. - if l.firstCPUUpdate.IsZero() { - l.firstCPUUpdate = now - } else if now.Sub(l.firstCPUUpdate) >= resourceUtilizationSlidingWindow { - currCPUUtil = l.cpuMovingAvg.Rate() / 100 - l.currCPUUtil.Store(currCPUUtil) - } + // stable results. The EWMA rate assumes a warmup period of N ticks (currently N=60). + // During the warmup, the reported CPU utilization will be 0. + currCPUUtil = l.cpuMovingAvg.Rate() / 100 + l.currCPUUtil.Store(currCPUUtil) var reason string if l.memoryLimit > 0 && currHeapSize >= l.memoryLimit { diff --git a/pkg/util/math/rate.go b/pkg/util/math/rate.go index 0ed8c60f7f9..ca464a0859e 100644 --- a/pkg/util/math/rate.go +++ b/pkg/util/math/rate.go @@ -12,6 +12,10 @@ import ( "go.uber.org/atomic" ) +const ( + warmupSamples uint8 = 60 +) + // EwmaRate tracks an exponentially weighted moving average of a per-second rate. type EwmaRate struct { newEvents atomic.Int64 @@ -22,6 +26,7 @@ type EwmaRate struct { mutex sync.RWMutex lastRate float64 init bool + count uint8 } func NewEWMARate(alpha float64, interval time.Duration) *EwmaRate { @@ -35,6 +40,12 @@ func NewEWMARate(alpha float64, interval time.Duration) *EwmaRate { func (r *EwmaRate) Rate() float64 { r.mutex.RLock() defer r.mutex.RUnlock() + + // until the first `warmupSamples` have been seen, the moving average is "not ready" to be queried + if r.count < warmupSamples { + return 0.0 + } + return r.lastRate } @@ -46,6 +57,10 @@ func (r *EwmaRate) Tick() { r.mutex.Lock() defer r.mutex.Unlock() + if r.count < warmupSamples { + r.count++ + } + if r.init { r.lastRate += r.alpha * (instantRate - r.lastRate) } else { diff --git a/pkg/util/math/rate_test.go b/pkg/util/math/rate_test.go index 99ead2452ef..094d668c0ac 100644 --- a/pkg/util/math/rate_test.go +++ b/pkg/util/math/rate_test.go @@ -12,11 +12,14 @@ import ( "github.com/stretchr/testify/require" ) +type tickTest struct { + events int + want float64 +} + func TestRate(t *testing.T) { - ticks := []struct { - events int - want float64 - }{ + ticks := populateWarmupSamples() + ticks = append(ticks, []tickTest{ {60, 1}, {30, 0.9}, {0, 0.72}, @@ -28,7 +31,7 @@ func TestRate(t *testing.T) { {0, 0.25427968}, {0, 0.203423744}, {0, 0.1627389952}, - } + }...) r := NewEWMARate(0.2, time.Minute) for _, tick := range ticks { @@ -50,3 +53,11 @@ func TestRate(t *testing.T) { require.InDelta(t, tick.want, r.Rate(), 0.0000000001, "unexpected rate") } } + +func populateWarmupSamples() []tickTest { + samples := make([]tickTest, 0, warmupSamples-1) + for range warmupSamples - 1 { + samples = append(samples, tickTest{60, 0}) + } + return samples +}