Skip to content

Commit

Permalink
Fixing review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed May 27, 2024
1 parent 26fd19e commit 2a6c937
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 79 deletions.
125 changes: 78 additions & 47 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/middleware"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/mimirpb"
Expand All @@ -29,6 +31,34 @@ const (
testDelayKey testCtxKey = "test-delay"
)

type circuitBreakerMetrics struct {
circuitBreakerCurrentState *prometheus.GaugeVec

circuitBreakerTransitions *prometheus.CounterVec
circuitBreakerResults *prometheus.CounterVec
}

type startPushRequestFn func(context.Context, int64) (context.Context, bool, error)

type pushRequestFn func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error)

func newCircuitBreakerMetrics(r prometheus.Registerer) *circuitBreakerMetrics {
return &circuitBreakerMetrics{
circuitBreakerCurrentState: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ingester_circuit_breaker_current_state",
Help: "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.",
}, []string{"state"}),
circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_transitions_total",
Help: "Number of times the circuit breaker has entered a state.",
}, []string{"ingester", "state"}),
circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_results_total",
Help: "Results of executing requests via the circuit breaker.",
}, []string{"ingester", "result"}),
}
}

type CircuitBreakerConfig struct {
Enabled bool `yaml:"enabled" category:"experimental"`
FailureThreshold uint `yaml:"failure_threshold" category:"experimental"`
Expand Down Expand Up @@ -56,22 +86,26 @@ func (cfg *CircuitBreakerConfig) Validate() error {
}

type circuitBreaker struct {
cfg CircuitBreakerConfig
circuitbreaker.CircuitBreaker[any]
ingester *Ingester
executor failsafe.Executor[any]
startTime time.Time
executor failsafe.Executor[any]
ingesterID string
logger log.Logger
metrics *circuitBreakerMetrics
startTime time.Time
}

func newCircuitBreaker(ingester *Ingester) *circuitBreaker {
ingesterID := ingester.cfg.IngesterRing.InstanceID
cfg := ingester.cfg.CircuitBreakerConfig
metrics := ingester.metrics
func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker {
metrics := newCircuitBreakerMetrics(registerer)
// Initialize each of the known labels for circuit breaker metrics for this particular ingester.
transitionOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String())
transitionHalfOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String())
transitionClosed := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String())
countSuccess := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultSuccess)
countError := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultError)
gaugeOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.OpenState.String())
gaugeHalfOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.HalfOpenState.String())
gaugeClosed := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.ClosedState.String())

cbBuilder := circuitbreaker.Builder[any]().
WithFailureThreshold(cfg.FailureThreshold).
Expand All @@ -84,15 +118,27 @@ func newCircuitBreaker(ingester *Ingester) *circuitBreaker {
}).
OnClose(func(event circuitbreaker.StateChangedEvent) {
transitionClosed.Inc()
level.Info(ingester.logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
gaugeOpen.Set(0)
gaugeHalfOpen.Set(0)
gaugeClosed.Set(1)
level.Info(logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
}).
OnOpen(func(event circuitbreaker.StateChangedEvent) {
transitionOpen.Inc()
level.Info(ingester.logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
gaugeOpen.Set(1)
gaugeHalfOpen.Set(0)
gaugeClosed.Set(0)
level.Warn(logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
}).
OnHalfOpen(func(event circuitbreaker.StateChangedEvent) {
transitionHalfOpen.Inc()
level.Info(ingester.logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
gaugeOpen.Set(0)
gaugeHalfOpen.Set(1)
gaugeClosed.Set(0)
level.Info(logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState)
}).
OnFailure(func(e failsafe.ExecutionEvent[any]) {
fmt.Println("OnFailure", e.LastError())
}).
HandleIf(func(_ any, err error) bool { return isFailure(err) })

Expand All @@ -107,9 +153,12 @@ func newCircuitBreaker(ingester *Ingester) *circuitBreaker {

cb := cbBuilder.Build()
return &circuitBreaker{
cfg: cfg,
CircuitBreaker: cb,
ingester: ingester,
executor: failsafe.NewExecutor[any](cb),
ingesterID: ingesterID,
logger: logger,
metrics: metrics,
startTime: time.Now().Add(cfg.InitialDelay),
}
}
Expand All @@ -136,47 +185,31 @@ func isFailure(err error) bool {
}

func (cb *circuitBreaker) isActive() bool {
if cb == nil {
return false
}
return cb.startTime.Before(time.Now())
}

func (cb *circuitBreaker) ingesterID() string {
return cb.ingester.cfg.IngesterRing.InstanceID
}

func (cb *circuitBreaker) logger() log.Logger {
return cb.ingester.logger
}

func (cb *circuitBreaker) metrics() *ingesterMetrics {
return cb.ingester.metrics
}

func (cb *circuitBreaker) config() CircuitBreakerConfig {
return cb.ingester.cfg.CircuitBreakerConfig
}

func (cb *circuitBreaker) get(ctx context.Context, callbackFnName string, callbackFn func() (any, error)) (any, error) {
res, err := cb.executor.Get(callbackFn)
func (cb *circuitBreaker) get(ctx context.Context, op func() (any, error)) (any, error) {
res, err := cb.executor.Get(op)
if err != nil && errors.Is(err, circuitbreaker.ErrOpen) {
cb.metrics().circuitBreakerResults.WithLabelValues(cb.ingesterID(), resultOpen).Inc()
cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultOpen).Inc()
cbOpenErr := middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())}
return res, newErrorWithStatus(cbOpenErr, codes.Unavailable)
}
return res, cb.processError(ctx, err, callbackFnName)
return res, cb.processError(ctx, err)
}

func (cb *circuitBreaker) processError(ctx context.Context, err error, callbackFnName string) error {
func (cb *circuitBreaker) processError(ctx context.Context, err error) error {
if err == nil {
return nil
}
if errors.Is(err, ctx.Err()) {
level.Error(cb.logger()).Log("msg", fmt.Sprintf("callback function %s completed with an error found in the context", callbackFnName), "ingester", cb.ingesterID(), "ctxErr", ctx.Err())

// ctx.Err() was registered with the circuit breaker's executor, but we don't propagate it
return nil
}

level.Error(cb.logger()).Log("msg", fmt.Sprintf("callback function %s completed with an error", callbackFnName), "ingester", cb.ingesterID(), "err", err)
return err
}

Expand All @@ -185,19 +218,17 @@ func (cb *circuitBreaker) contextWithTimeout(parent context.Context, timeout tim
timeout = defaultTimeout
}
ctx, cancel := context.WithTimeout(context.WithoutCancel(parent), timeout)
if cb.config().testModeEnabled {
if initialDelay, ok := parent.Value(testDelayKey).(string); ok {
if d, err := time.ParseDuration(initialDelay); err == nil {
time.Sleep(d)
}
if cb.cfg.testModeEnabled {
if initialDelay, ok := parent.Value(testDelayKey).(time.Duration); ok {
time.Sleep(initialDelay)
}
}
return ctx, cancel
}

func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) {
callbackCtx, callbackErr := cb.get(ctx, "ingester.startPushRequest", func() (any, error) {
callbackCtx, _, callbackErr := cb.ingester.startPushRequest(ctx, reqSize)
func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64, startPushRequest startPushRequestFn) (context.Context, error) {
callbackCtx, callbackErr := cb.get(ctx, func() (any, error) {
callbackCtx, _, callbackErr := startPushRequest(ctx, reqSize)
return callbackCtx, callbackErr
})
if callbackErr == nil {
Expand All @@ -206,12 +237,12 @@ func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64) (
return nil, callbackErr
}

func (cb *circuitBreaker) Push(parent context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
ctx, cancel := cb.contextWithTimeout(parent, cb.config().PushTimeout)
func (cb *circuitBreaker) Push(parent context.Context, req *mimirpb.WriteRequest, push pushRequestFn) (*mimirpb.WriteResponse, error) {
ctx, cancel := cb.contextWithTimeout(parent, cb.cfg.PushTimeout)
defer cancel()

callbackResult, callbackErr := cb.get(ctx, "ingester.push", func() (any, error) {
callbackResult, callbackErr := cb.ingester.push(ctx, req)
callbackResult, callbackErr := cb.get(ctx, func() (any, error) {
callbackResult, callbackErr := push(ctx, req)
if callbackErr != nil {
return callbackResult, callbackErr
}
Expand Down
21 changes: 16 additions & 5 deletions pkg/ingester/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
expectedErrorWhenCircuitBreakerClosed: nil,
limits: InstanceLimits{MaxInMemoryTenants: 3},
ctx: func(ctx context.Context) context.Context {
return context.WithValue(ctx, testDelayKey, (2 * pushTimeout).String())
return context.WithValue(ctx, testDelayKey, 2*pushTimeout)
},
},
"instance limit hit": {
Expand All @@ -51,6 +51,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
metricNames := []string{
"cortex_ingester_circuit_breaker_results_total",
"cortex_ingester_circuit_breaker_transitions_total",
"cortex_ingester_circuit_breaker_current_state",
}

registry := prometheus.NewRegistry()
Expand Down Expand Up @@ -147,28 +148,38 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) {
var expectedMetrics string
if initialDelayEnabled {
expectedMetrics = `
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker.
# TYPE cortex_ingester_circuit_breaker_results_total counter
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 0
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 0
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state.
# TYPE cortex_ingester_circuit_breaker_transitions_total counter
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 0
# HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.
# TYPE cortex_ingester_circuit_breaker_current_state gauge
cortex_ingester_circuit_breaker_current_state{state="open"} 0
cortex_ingester_circuit_breaker_current_state{state="half-open"} 0
cortex_ingester_circuit_breaker_current_state{state="closed"} 0
`
} else {
expectedMetrics = `
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker
# HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker.
# TYPE cortex_ingester_circuit_breaker_results_total counter
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 2
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 2
cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 1
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state
# HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state.
# TYPE cortex_ingester_circuit_breaker_transitions_total counter
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0
cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1
# HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.
# TYPE cortex_ingester_circuit_breaker_current_state gauge
cortex_ingester_circuit_breaker_current_state{state="open"} 1
cortex_ingester_circuit_breaker_current_state{state="half-open"} 0
cortex_ingester_circuit_breaker_current_state{state="closed"} 0
`
}
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
Expand Down
23 changes: 8 additions & 15 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,11 @@ func (cfg *Config) Validate(logger log.Logger) error {

err := cfg.IngesterRing.Validate()

if err == nil {
return cfg.CircuitBreakerConfig.Validate()
if err != nil {
return err
}

return err
return cfg.CircuitBreakerConfig.Validate()
}

func (cfg *Config) getIgnoreSeriesLimitForMetricNamesMap() map[string]struct{} {
Expand Down Expand Up @@ -405,7 +405,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing,
i.activeGroups = activeGroupsCleanupService

if cfg.CircuitBreakerConfig.Enabled {
i.circuitBreaker = newCircuitBreaker(i)
i.circuitBreaker = newCircuitBreaker(cfg.CircuitBreakerConfig, cfg.IngesterRing.InstanceID, logger, registerer)
}

if registerer != nil {
Expand Down Expand Up @@ -974,8 +974,8 @@ type pushRequestState struct {
// StartPushRequest checks if ingester can start push request, and increments relevant counters.
// If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated.
func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) {
if i.isCircuitBreakerActive() {
return i.circuitBreaker.StartPushRequest(ctx, reqSize)
if i.circuitBreaker.isActive() {
return i.circuitBreaker.StartPushRequest(ctx, reqSize, i.startPushRequest)
}
ctx, _, err := i.startPushRequest(ctx, reqSize)
return ctx, err
Expand Down Expand Up @@ -3766,17 +3766,10 @@ func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest)
return nil
}

func (i *Ingester) isCircuitBreakerActive() bool {
if i.circuitBreaker == nil {
return false
}
return i.circuitBreaker.isActive()
}

// Push implements client.IngesterServer, which is registered into gRPC server.
func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
if i.isCircuitBreakerActive() {
return i.circuitBreaker.Push(ctx, req)
if i.circuitBreaker.isActive() {
return i.circuitBreaker.Push(ctx, req, i.push)
}
return i.push(ctx, req)
}
Expand Down
12 changes: 0 additions & 12 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ type ingesterMetrics struct {

// Count number of requests rejected due to utilization based limiting.
utilizationLimitedRequests *prometheus.CounterVec

circuitBreakerTransitions *prometheus.CounterVec
circuitBreakerResults *prometheus.CounterVec
}

func newIngesterMetrics(
Expand Down Expand Up @@ -378,15 +375,6 @@ func newIngesterMetrics(
Name: "cortex_ingester_prepare_shutdown_requested",
Help: "If the ingester has been requested to prepare for shutdown via endpoint or marker file.",
}),

circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_transitions_total",
Help: "Number of times the circuit breaker has entered a state",
}, []string{"ingester", "state"}),
circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_circuit_breaker_results_total",
Help: "Results of executing requests via the circuit breaker",
}, []string{"ingester", "result"}),
}

// Initialize expected rejected request labels
Expand Down

0 comments on commit 2a6c937

Please sign in to comment.