diff --git a/pkg/ingester/circuitbreaker.go b/pkg/ingester/circuitbreaker.go index 852dacca54e..ec9186040ad 100644 --- a/pkg/ingester/circuitbreaker.go +++ b/pkg/ingester/circuitbreaker.go @@ -14,6 +14,8 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/middleware" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/mimirpb" @@ -29,6 +31,34 @@ const ( testDelayKey testCtxKey = "test-delay" ) +type circuitBreakerMetrics struct { + circuitBreakerCurrentState *prometheus.GaugeVec + + circuitBreakerTransitions *prometheus.CounterVec + circuitBreakerResults *prometheus.CounterVec +} + +type startPushRequestFn func(context.Context, int64) (context.Context, bool, error) + +type pushRequestFn func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) + +func newCircuitBreakerMetrics(r prometheus.Registerer) *circuitBreakerMetrics { + return &circuitBreakerMetrics{ + circuitBreakerCurrentState: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_circuit_breaker_current_state", + Help: "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.", + }, []string{"state"}), + circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_circuit_breaker_transitions_total", + Help: "Number of times the circuit breaker has entered a state.", + }, []string{"ingester", "state"}), + circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_circuit_breaker_results_total", + Help: "Results of executing requests via the circuit breaker.", + }, []string{"ingester", "result"}), + } +} + type CircuitBreakerConfig struct { Enabled bool `yaml:"enabled" category:"experimental"` FailureThreshold uint `yaml:"failure_threshold" category:"experimental"` @@ -56,22 +86,26 @@ func (cfg *CircuitBreakerConfig) Validate() error { } type circuitBreaker struct { + cfg CircuitBreakerConfig circuitbreaker.CircuitBreaker[any] - ingester *Ingester - executor failsafe.Executor[any] - startTime time.Time + executor failsafe.Executor[any] + ingesterID string + logger log.Logger + metrics *circuitBreakerMetrics + startTime time.Time } -func newCircuitBreaker(ingester *Ingester) *circuitBreaker { - ingesterID := ingester.cfg.IngesterRing.InstanceID - cfg := ingester.cfg.CircuitBreakerConfig - metrics := ingester.metrics +func newCircuitBreaker(cfg CircuitBreakerConfig, ingesterID string, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker { + metrics := newCircuitBreakerMetrics(registerer) // Initialize each of the known labels for circuit breaker metrics for this particular ingester. transitionOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String()) transitionHalfOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String()) transitionClosed := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String()) countSuccess := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultSuccess) countError := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultError) + gaugeOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.OpenState.String()) + gaugeHalfOpen := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.HalfOpenState.String()) + gaugeClosed := metrics.circuitBreakerCurrentState.WithLabelValues(circuitbreaker.ClosedState.String()) cbBuilder := circuitbreaker.Builder[any](). WithFailureThreshold(cfg.FailureThreshold). @@ -84,15 +118,27 @@ func newCircuitBreaker(ingester *Ingester) *circuitBreaker { }). OnClose(func(event circuitbreaker.StateChangedEvent) { transitionClosed.Inc() - level.Info(ingester.logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + gaugeOpen.Set(0) + gaugeHalfOpen.Set(0) + gaugeClosed.Set(1) + level.Info(logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) }). OnOpen(func(event circuitbreaker.StateChangedEvent) { transitionOpen.Inc() - level.Info(ingester.logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + gaugeOpen.Set(1) + gaugeHalfOpen.Set(0) + gaugeClosed.Set(0) + level.Warn(logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) }). OnHalfOpen(func(event circuitbreaker.StateChangedEvent) { transitionHalfOpen.Inc() - level.Info(ingester.logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + gaugeOpen.Set(0) + gaugeHalfOpen.Set(1) + gaugeClosed.Set(0) + level.Info(logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + }). + OnFailure(func(e failsafe.ExecutionEvent[any]) { + fmt.Println("OnFailure", e.LastError()) }). HandleIf(func(_ any, err error) bool { return isFailure(err) }) @@ -107,9 +153,12 @@ func newCircuitBreaker(ingester *Ingester) *circuitBreaker { cb := cbBuilder.Build() return &circuitBreaker{ + cfg: cfg, CircuitBreaker: cb, - ingester: ingester, executor: failsafe.NewExecutor[any](cb), + ingesterID: ingesterID, + logger: logger, + metrics: metrics, startTime: time.Now().Add(cfg.InitialDelay), } } @@ -136,47 +185,31 @@ func isFailure(err error) bool { } func (cb *circuitBreaker) isActive() bool { + if cb == nil { + return false + } return cb.startTime.Before(time.Now()) } -func (cb *circuitBreaker) ingesterID() string { - return cb.ingester.cfg.IngesterRing.InstanceID -} - -func (cb *circuitBreaker) logger() log.Logger { - return cb.ingester.logger -} - -func (cb *circuitBreaker) metrics() *ingesterMetrics { - return cb.ingester.metrics -} - -func (cb *circuitBreaker) config() CircuitBreakerConfig { - return cb.ingester.cfg.CircuitBreakerConfig -} - -func (cb *circuitBreaker) get(ctx context.Context, callbackFnName string, callbackFn func() (any, error)) (any, error) { - res, err := cb.executor.Get(callbackFn) +func (cb *circuitBreaker) get(ctx context.Context, op func() (any, error)) (any, error) { + res, err := cb.executor.Get(op) if err != nil && errors.Is(err, circuitbreaker.ErrOpen) { - cb.metrics().circuitBreakerResults.WithLabelValues(cb.ingesterID(), resultOpen).Inc() + cb.metrics.circuitBreakerResults.WithLabelValues(cb.ingesterID, resultOpen).Inc() cbOpenErr := middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())} return res, newErrorWithStatus(cbOpenErr, codes.Unavailable) } - return res, cb.processError(ctx, err, callbackFnName) + return res, cb.processError(ctx, err) } -func (cb *circuitBreaker) processError(ctx context.Context, err error, callbackFnName string) error { +func (cb *circuitBreaker) processError(ctx context.Context, err error) error { if err == nil { return nil } if errors.Is(err, ctx.Err()) { - level.Error(cb.logger()).Log("msg", fmt.Sprintf("callback function %s completed with an error found in the context", callbackFnName), "ingester", cb.ingesterID(), "ctxErr", ctx.Err()) - // ctx.Err() was registered with the circuit breaker's executor, but we don't propagate it return nil } - level.Error(cb.logger()).Log("msg", fmt.Sprintf("callback function %s completed with an error", callbackFnName), "ingester", cb.ingesterID(), "err", err) return err } @@ -185,19 +218,17 @@ func (cb *circuitBreaker) contextWithTimeout(parent context.Context, timeout tim timeout = defaultTimeout } ctx, cancel := context.WithTimeout(context.WithoutCancel(parent), timeout) - if cb.config().testModeEnabled { - if initialDelay, ok := parent.Value(testDelayKey).(string); ok { - if d, err := time.ParseDuration(initialDelay); err == nil { - time.Sleep(d) - } + if cb.cfg.testModeEnabled { + if initialDelay, ok := parent.Value(testDelayKey).(time.Duration); ok { + time.Sleep(initialDelay) } } return ctx, cancel } -func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { - callbackCtx, callbackErr := cb.get(ctx, "ingester.startPushRequest", func() (any, error) { - callbackCtx, _, callbackErr := cb.ingester.startPushRequest(ctx, reqSize) +func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64, startPushRequest startPushRequestFn) (context.Context, error) { + callbackCtx, callbackErr := cb.get(ctx, func() (any, error) { + callbackCtx, _, callbackErr := startPushRequest(ctx, reqSize) return callbackCtx, callbackErr }) if callbackErr == nil { @@ -206,12 +237,12 @@ func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64) ( return nil, callbackErr } -func (cb *circuitBreaker) Push(parent context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { - ctx, cancel := cb.contextWithTimeout(parent, cb.config().PushTimeout) +func (cb *circuitBreaker) Push(parent context.Context, req *mimirpb.WriteRequest, push pushRequestFn) (*mimirpb.WriteResponse, error) { + ctx, cancel := cb.contextWithTimeout(parent, cb.cfg.PushTimeout) defer cancel() - callbackResult, callbackErr := cb.get(ctx, "ingester.push", func() (any, error) { - callbackResult, callbackErr := cb.ingester.push(ctx, req) + callbackResult, callbackErr := cb.get(ctx, func() (any, error) { + callbackResult, callbackErr := push(ctx, req) if callbackErr != nil { return callbackResult, callbackErr } diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go index aec3b3acdf3..58cf861ce93 100644 --- a/pkg/ingester/circuitbreaker_test.go +++ b/pkg/ingester/circuitbreaker_test.go @@ -35,7 +35,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { expectedErrorWhenCircuitBreakerClosed: nil, limits: InstanceLimits{MaxInMemoryTenants: 3}, ctx: func(ctx context.Context) context.Context { - return context.WithValue(ctx, testDelayKey, (2 * pushTimeout).String()) + return context.WithValue(ctx, testDelayKey, 2*pushTimeout) }, }, "instance limit hit": { @@ -51,6 +51,7 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { metricNames := []string{ "cortex_ingester_circuit_breaker_results_total", "cortex_ingester_circuit_breaker_transitions_total", + "cortex_ingester_circuit_breaker_current_state", } registry := prometheus.NewRegistry() @@ -147,28 +148,38 @@ func TestIngester_Push_CircuitBreaker(t *testing.T) { var expectedMetrics string if initialDelayEnabled { expectedMetrics = ` - # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 0 cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 0 - # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{state="open"} 0 + cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{state="closed"} 0 ` } else { expectedMetrics = ` - # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 2 cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 2 cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 1 - # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{state="open"} 1 + cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{state="closed"} 0 ` } assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d62269b8323..d376eafcfce 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -263,11 +263,11 @@ func (cfg *Config) Validate(logger log.Logger) error { err := cfg.IngesterRing.Validate() - if err == nil { - return cfg.CircuitBreakerConfig.Validate() + if err != nil { + return err } - return err + return cfg.CircuitBreakerConfig.Validate() } func (cfg *Config) getIgnoreSeriesLimitForMetricNamesMap() map[string]struct{} { @@ -405,7 +405,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.activeGroups = activeGroupsCleanupService if cfg.CircuitBreakerConfig.Enabled { - i.circuitBreaker = newCircuitBreaker(i) + i.circuitBreaker = newCircuitBreaker(cfg.CircuitBreakerConfig, cfg.IngesterRing.InstanceID, logger, registerer) } if registerer != nil { @@ -974,8 +974,8 @@ type pushRequestState struct { // StartPushRequest checks if ingester can start push request, and increments relevant counters. // If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated. func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { - if i.isCircuitBreakerActive() { - return i.circuitBreaker.StartPushRequest(ctx, reqSize) + if i.circuitBreaker.isActive() { + return i.circuitBreaker.StartPushRequest(ctx, reqSize, i.startPushRequest) } ctx, _, err := i.startPushRequest(ctx, reqSize) return ctx, err @@ -3766,17 +3766,10 @@ func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) return nil } -func (i *Ingester) isCircuitBreakerActive() bool { - if i.circuitBreaker == nil { - return false - } - return i.circuitBreaker.isActive() -} - // Push implements client.IngesterServer, which is registered into gRPC server. func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { - if i.isCircuitBreakerActive() { - return i.circuitBreaker.Push(ctx, req) + if i.circuitBreaker.isActive() { + return i.circuitBreaker.Push(ctx, req, i.push) } return i.push(ctx, req) } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index d756d4c86e9..833a802fb04 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -83,9 +83,6 @@ type ingesterMetrics struct { // Count number of requests rejected due to utilization based limiting. utilizationLimitedRequests *prometheus.CounterVec - - circuitBreakerTransitions *prometheus.CounterVec - circuitBreakerResults *prometheus.CounterVec } func newIngesterMetrics( @@ -378,15 +375,6 @@ func newIngesterMetrics( Name: "cortex_ingester_prepare_shutdown_requested", Help: "If the ingester has been requested to prepare for shutdown via endpoint or marker file.", }), - - circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_circuit_breaker_transitions_total", - Help: "Number of times the circuit breaker has entered a state", - }, []string{"ingester", "state"}), - circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_circuit_breaker_results_total", - Help: "Results of executing requests via the circuit breaker", - }, []string{"ingester", "result"}), } // Initialize expected rejected request labels