From fe8a1e5565b1cc7b506fe31a64db5ec01b7a31e4 Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Tue, 24 Dec 2024 14:09:47 +0100 Subject: [PATCH] rename + make cooldownuntil a normal int64 and lock with observedMtx --- pkg/costattribution/manager.go | 2 +- pkg/costattribution/tracker.go | 25 +++++++++++++++++++------ pkg/costattribution/tracker_test.go | 2 +- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/costattribution/manager.go b/pkg/costattribution/manager.go index 2c74e8f8e9..8ba8c26422 100644 --- a/pkg/costattribution/manager.go +++ b/pkg/costattribution/manager.go @@ -154,7 +154,7 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error { t.cleanupTrackerAttribution(key) } - if t.shouldDelete(deadline) { + if t.recoverFromOverflow(deadline) { m.deleteTracker(userID) } } diff --git a/pkg/costattribution/tracker.go b/pkg/costattribution/tracker.go index 5057c26ad3..0975779bba 100644 --- a/pkg/costattribution/tracker.go +++ b/pkg/costattribution/tracker.go @@ -45,12 +45,12 @@ type Tracker struct { discardedSampleAttribution *prometheus.Desc failedActiveSeriesDecrement *prometheus.Desc overflowLabels []string - observedMtx sync.RWMutex observed map[string]*observation + observedMtx sync.RWMutex + cooldownUntil int64 hashBuffer []byte state TrackerState overflowCounter *observation - cooldownUntil *atomic.Int64 totalFailedActiveSeries *atomic.Float64 cooldownDuration int64 logger log.Logger @@ -293,7 +293,7 @@ func (t *Tracker) updateState(ts int64, activeSeriesIncrement, receivedSampleInc t.overflowCounter.activeSerie.Add(o.activeSerie.Load()) } } - t.cooldownUntil = atomic.NewInt64(ts + t.cooldownDuration) + t.cooldownUntil = ts + t.cooldownDuration } if t.state == Overflow { @@ -326,12 +326,25 @@ func (t *Tracker) createNewObservation(key []byte, ts int64, activeSeriesIncreme } } -func (t *Tracker) shouldDelete(deadline int64) bool { - if t.cooldownUntil != nil && t.cooldownUntil.Load() < deadline { +func (t *Tracker) recoverFromOverflow(deadline int64) bool { + t.observedMtx.RLock() + if t.cooldownUntil != 0 && t.cooldownUntil < deadline { + if len(t.observed) <= t.maxCardinality { + t.observedMtx.RUnlock() + return true + } + t.observedMtx.RUnlock() + + // Increase the cooldown duration if the number of observations is still above the max cardinality + t.observedMtx.Lock() if len(t.observed) <= t.maxCardinality { + t.observedMtx.Unlock() return true } - t.cooldownUntil.Store(deadline + t.cooldownDuration) + t.cooldownUntil = deadline + t.cooldownDuration + t.observedMtx.Unlock() + } else { + t.observedMtx.RUnlock() } return false } diff --git a/pkg/costattribution/tracker_test.go b/pkg/costattribution/tracker_test.go index 08ad7d6644..bd5360f555 100644 --- a/pkg/costattribution/tracker_test.go +++ b/pkg/costattribution/tracker_test.go @@ -94,7 +94,7 @@ func TestTracker_updateCounters(t *testing.T) { tracker.updateCounters(lbls3, 4, 1, 0, 0, nil, true) assert.Equal(t, Overflow, tracker.state, "Fourth observation, should stay overflow") - assert.Equal(t, int64(3+tracker.cooldownDuration), tracker.cooldownUntil.Load(), "CooldownUntil should be updated correctly") + assert.Equal(t, int64(3+tracker.cooldownDuration), tracker.cooldownUntil, "CooldownUntil should be updated correctly") } func TestTracker_inactiveObservations(t *testing.T) {