Skip to content

Commit

Permalink
rename + make cooldownuntil a normal int64 and lock with observedMtx
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Dec 24, 2024
1 parent f697e6f commit fe8a1e5
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error {
t.cleanupTrackerAttribution(key)
}

if t.shouldDelete(deadline) {
if t.recoverFromOverflow(deadline) {
m.deleteTracker(userID)
}
}
Expand Down
25 changes: 19 additions & 6 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ type Tracker struct {
discardedSampleAttribution *prometheus.Desc
failedActiveSeriesDecrement *prometheus.Desc
overflowLabels []string
observedMtx sync.RWMutex
observed map[string]*observation
observedMtx sync.RWMutex
cooldownUntil int64
hashBuffer []byte
state TrackerState
overflowCounter *observation
cooldownUntil *atomic.Int64
totalFailedActiveSeries *atomic.Float64
cooldownDuration int64
logger log.Logger
Expand Down Expand Up @@ -293,7 +293,7 @@ func (t *Tracker) updateState(ts int64, activeSeriesIncrement, receivedSampleInc
t.overflowCounter.activeSerie.Add(o.activeSerie.Load())
}
}
t.cooldownUntil = atomic.NewInt64(ts + t.cooldownDuration)
t.cooldownUntil = ts + t.cooldownDuration
}

if t.state == Overflow {
Expand Down Expand Up @@ -326,12 +326,25 @@ func (t *Tracker) createNewObservation(key []byte, ts int64, activeSeriesIncreme
}
}

func (t *Tracker) shouldDelete(deadline int64) bool {
if t.cooldownUntil != nil && t.cooldownUntil.Load() < deadline {
func (t *Tracker) recoverFromOverflow(deadline int64) bool {
t.observedMtx.RLock()
if t.cooldownUntil != 0 && t.cooldownUntil < deadline {
if len(t.observed) <= t.maxCardinality {
t.observedMtx.RUnlock()
return true
}
t.observedMtx.RUnlock()

// Increase the cooldown duration if the number of observations is still above the max cardinality
t.observedMtx.Lock()
if len(t.observed) <= t.maxCardinality {
t.observedMtx.Unlock()
return true
}
t.cooldownUntil.Store(deadline + t.cooldownDuration)
t.cooldownUntil = deadline + t.cooldownDuration
t.observedMtx.Unlock()
} else {
t.observedMtx.RUnlock()
}
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/costattribution/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestTracker_updateCounters(t *testing.T) {
tracker.updateCounters(lbls3, 4, 1, 0, 0, nil, true)
assert.Equal(t, Overflow, tracker.state, "Fourth observation, should stay overflow")

assert.Equal(t, int64(3+tracker.cooldownDuration), tracker.cooldownUntil.Load(), "CooldownUntil should be updated correctly")
assert.Equal(t, int64(3+tracker.cooldownDuration), tracker.cooldownUntil, "CooldownUntil should be updated correctly")
}

func TestTracker_inactiveObservations(t *testing.T) {
Expand Down

0 comments on commit fe8a1e5

Please sign in to comment.