Skip to content

Commit

Permalink
Ingester circuit breaker deactivate() function (#9743) (#9761)
Browse files Browse the repository at this point in the history
* Log request type in circuit breaker state change messages

* Test blocking reads if push circuit breaker is open

* Add circuit breaker deactivate()

* Don't set active state twice when initial delay is 0

* Log when breakers are activated/deactivated

* Only log when actually changing circuit breaker state

(cherry picked from commit 219f278)

Co-authored-by: Patryk Prus <[email protected]>
  • Loading branch information
grafanabot and pr00se authored Oct 28, 2024
1 parent 97a7589 commit 97a1a9b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 17 deletions.
19 changes: 15 additions & 4 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, registerer prometheus.Registere
WithDelay(cfg.CooldownPeriod).
OnClose(func(event circuitbreaker.StateChangedEvent) {
circuitBreakerTransitionsCounter(cb.metrics, circuitbreaker.ClosedState).Inc()
level.Info(logger).Log("msg", "circuit breaker is closed", "previous", event.OldState, "current", event.NewState)
level.Info(logger).Log("msg", "circuit breaker is closed", "previous", event.OldState, "current", event.NewState, "requestType", requestType)
}).
OnOpen(func(event circuitbreaker.StateChangedEvent) {
circuitBreakerTransitionsCounter(cb.metrics, circuitbreaker.OpenState).Inc()
level.Warn(logger).Log("msg", "circuit breaker is open", "previous", event.OldState, "current", event.NewState)
level.Warn(logger).Log("msg", "circuit breaker is open", "previous", event.OldState, "current", event.NewState, "requestType", requestType)
}).
OnHalfOpen(func(event circuitbreaker.StateChangedEvent) {
circuitBreakerTransitionsCounter(cb.metrics, circuitbreaker.HalfOpenState).Inc()
level.Info(logger).Log("msg", "circuit breaker is half-open", "previous", event.OldState, "current", event.NewState)
level.Info(logger).Log("msg", "circuit breaker is half-open", "previous", event.OldState, "current", event.NewState, "requestType", requestType)
})

if cfg.testModeEnabled {
Expand Down Expand Up @@ -188,17 +188,28 @@ func (cb *circuitBreaker) isActive() bool {
}

func (cb *circuitBreaker) activate() {
if cb == nil {
if cb == nil || cb.active.Load() {
return
}
if cb.cfg.InitialDelay == 0 {
level.Info(cb.logger).Log("msg", "activating circuit breaker", "requestType", cb.requestType)
cb.active.Store(true)
return
}
time.AfterFunc(cb.cfg.InitialDelay, func() {
level.Info(cb.logger).Log("msg", "activating circuit breaker", "requestType", cb.requestType)
cb.active.Store(true)
})
}

func (cb *circuitBreaker) deactivate() {
if cb == nil || !cb.active.Load() {
return
}
level.Info(cb.logger).Log("msg", "deactivating circuit breaker", "requestType", cb.requestType)
cb.active.Store(false)
}

func (cb *circuitBreaker) isOpen() bool {
if !cb.isActive() {
return false
Expand Down
24 changes: 14 additions & 10 deletions pkg/ingester/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func TestCircuitBreaker_IsActive(t *testing.T) {
require.Eventually(t, func() bool {
return cb.isActive()
}, time.Second, 10*time.Millisecond)

// deactivate() makes the circuit breaker inactive again.
cb.deactivate()
require.False(t, cb.isActive())
}

func TestCircuitBreaker_TryAcquirePermit(t *testing.T) {
Expand All @@ -107,7 +111,7 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) {
"if circuit breaker is not active, finish function and no error are returned": {
initialDelay: 1 * time.Minute,
circuitBreakerSetup: func(cb *circuitBreaker) {
cb.active.Store(false)
cb.deactivate()
},
expectedCircuitBreakerError: false,
},
Expand Down Expand Up @@ -1029,7 +1033,7 @@ func TestPRCircuitBreaker_TryPushAcquirePermit(t *testing.T) {
}{
"if push circuit breaker is not active, finish function and no error are returned": {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.push.active.Store(false)
cb.push.deactivate()
},
expectedCircuitBreakerError: false,
expectedMetrics: `
Expand Down Expand Up @@ -1184,8 +1188,8 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
}{
"if read circuit breaker is not active and push circuit breaker is not active, finish function and no error are returned": {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.push.active.Store(false)
cb.read.deactivate()
cb.push.deactivate()
},
expectedCircuitBreakerError: false,
expectedMetrics: `
Expand Down Expand Up @@ -1217,7 +1221,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
},
"if read circuit breaker is not active and push circuit breaker is closed, finish function and no error are returned": {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.read.deactivate()
cb.push.activate()
cb.push.cb.Close()
},
Expand Down Expand Up @@ -1251,7 +1255,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
},
"if read circuit breaker is not active and push circuit breaker is open, finish function and no error are returned": {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.read.deactivate()
cb.push.activate()
cb.push.cb.Open()
},
Expand Down Expand Up @@ -1285,7 +1289,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
},
"if read circuit breaker is not active and push circuit breaker is half-open, finish function and no error are returned": {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.read.deactivate()
cb.push.activate()
cb.push.cb.HalfOpen()
},
Expand Down Expand Up @@ -1321,7 +1325,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Close()
cb.push.active.Store(false)
cb.push.deactivate()
},
expectedCircuitBreakerError: false,
expectedMetrics: `
Expand Down Expand Up @@ -1460,7 +1464,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Open()
cb.push.active.Store(false)
cb.push.deactivate()
},
expectedCircuitBreakerError: true,
expectedMetrics: `
Expand Down Expand Up @@ -1599,7 +1603,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.HalfOpen()
cb.push.active.Store(false)
cb.push.deactivate()
},
expectedCircuitBreakerError: false,
expectedMetrics: `
Expand Down
20 changes: 17 additions & 3 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ func TestIngester_StartReadRequest(t *testing.T) {
CooldownPeriod: 10 * time.Second,
RequestTimeout: 30 * time.Second,
}
cfg.PushCircuitBreaker = CircuitBreakerConfig{
Enabled: true,
CooldownPeriod: 10 * time.Second,
RequestTimeout: 2 * time.Second,
}
failingIng := newFailingIngester(t, cfg, nil, nil)
failingIng.startWaitAndCheck(context.Background(), t)
require.Equal(t, services.Running, failingIng.lifecycler.State())
Expand Down Expand Up @@ -286,7 +291,7 @@ func TestIngester_StartReadRequest(t *testing.T) {
require.ErrorIs(t, err, errTooBusy)
},
},
"fail if circuit breaker is open, and do not acquire a permit": {
"fail if read circuit breaker is open, and do not acquire a permit": {
setup: func(failingIng *failingIngester) {
failingIng.circuitBreaker.read.cb.Open()
},
Expand All @@ -295,9 +300,18 @@ func TestIngester_StartReadRequest(t *testing.T) {
require.ErrorAs(t, err, &circuitBreakerOpenError{})
},
},
"do not fail if circuit breaker is not active, and do not acquire a permit": {
"fail if push circuit breaker is open, and do not acquire a permit": {
setup: func(failingIng *failingIngester) {
failingIng.circuitBreaker.push.cb.Open()
},
expectedAcquiredPermitCount: 0,
verifyErr: func(err error) {
require.ErrorAs(t, err, &circuitBreakerOpenError{})
},
},
"do not fail if read circuit breaker is not active, and do not acquire a permit": {
setup: func(failingIng *failingIngester) {
failingIng.circuitBreaker.read.active.Store(false)
failingIng.circuitBreaker.read.deactivate()
},
expectedAcquiredPermitCount: 0,
},
Expand Down

0 comments on commit 97a1a9b

Please sign in to comment.