From 26fd19e2620f3903654b7eb2d904012a7b5e074b Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Fri, 24 May 2024 13:41:30 +0200 Subject: [PATCH] Adding circuit breakers on ingester server side for write path Signed-off-by: Yuri Nikolic --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 87 +++++++ cmd/mimir/help-all.txt.tmpl | 14 ++ cmd/mimir/help.txt.tmpl | 2 + .../mimir/configure/about-versioning.md | 10 +- .../configuration-parameters/index.md | 38 +++ pkg/ingester/circuitbreaker.go | 227 ++++++++++++++++++ pkg/ingester/circuitbreaker_test.go | 193 +++++++++++++++ pkg/ingester/errors.go | 19 ++ pkg/ingester/errors_test.go | 18 ++ pkg/ingester/ingester.go | 34 ++- pkg/ingester/metrics.go | 12 + 12 files changed, 653 insertions(+), 2 deletions(-) create mode 100644 pkg/ingester/circuitbreaker.go create mode 100644 pkg/ingester/circuitbreaker_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 901b19705f3..e9a9b4c0e56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 * [FEATURE] Query-frontend, querier: new experimental `/cardinality/active_native_histogram_metrics` API to get active native histogram metric names with statistics about active native histogram buckets. #7982 #7986 #8008 +* [FEATURE] Ingester: add experimental support for the server-side circuit breakers when writing to ingesters via `-ingester.circuit-breaker.enabled`, `-ingester.circuit-breaker.failure-threshold`, or `-ingester.circuit-breaker.cooldown-period` or their corresponding YAML. Added metrics `cortex_ingester_circuit_breaker_results_total` and `cortex_ingester_circuit_breaker_transitions_total`. #8180 * [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529 * [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series_per_request"}` #7989 #8010 * [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index fa2d5ee4b0e..30dbee9410d 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -3129,6 +3129,93 @@ "fieldFlag": "ingester.owned-series-update-interval", "fieldType": "duration", "fieldCategory": "experimental" + }, + { + "kind": "block", + "name": "circuit_breaker", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "enabled", + "required": false, + "desc": "Enable circuit breaking when making requests to ingesters", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "ingester.circuit-breaker.enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "failure_threshold", + "required": false, + "desc": "Max percentage of requests that can fail over period before the circuit breaker opens", + "fieldValue": null, + "fieldDefaultValue": 10, + "fieldFlag": "ingester.circuit-breaker.failure-threshold", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "failure_execution_threshold", + "required": false, + "desc": "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures", + "fieldValue": null, + "fieldDefaultValue": 100, + "fieldFlag": "ingester.circuit-breaker.failure-execution-threshold", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "thresholding_period", + "required": false, + "desc": "Moving window of time that the percentage of failed requests is computed over", + "fieldValue": null, + "fieldDefaultValue": 60000000000, + "fieldFlag": "ingester.circuit-breaker.thresholding-period", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "cooldown_period", + "required": false, + "desc": "How long the circuit breaker will stay in the open state before allowing some requests", + "fieldValue": null, + "fieldDefaultValue": 10000000000, + "fieldFlag": "ingester.circuit-breaker.cooldown-period", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "initial_delay", + "required": false, + "desc": "How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingester.circuit-breaker.initial-delay", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "push_timeout", + "required": false, + "desc": "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingester.circuit-breaker.push-timeout", + "fieldType": "duration", + "fieldCategory": "experiment" + } + ], + "fieldValue": null, + "fieldDefaultValue": null } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index deddd2ed984..2a41ff88e0b 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1301,6 +1301,20 @@ Usage of ./cmd/mimir/mimir: After what time a series is considered to be inactive. (default 10m0s) -ingester.active-series-metrics-update-period duration How often to update active series metrics. (default 1m0s) + -ingester.circuit-breaker.cooldown-period duration + [experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 10s) + -ingester.circuit-breaker.enabled + [experimental] Enable circuit breaking when making requests to ingesters + -ingester.circuit-breaker.failure-execution-threshold uint + [experimental] How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures (default 100) + -ingester.circuit-breaker.failure-threshold uint + [experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10) + -ingester.circuit-breaker.initial-delay duration + [experimental] How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted. + -ingester.circuit-breaker.push-timeout duration + How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors + -ingester.circuit-breaker.thresholding-period duration + [experimental] Moving window of time that the percentage of failed requests is computed over (default 1m0s) -ingester.client.backoff-max-period duration Maximum delay when backing off. (default 10s) -ingester.client.backoff-min-period duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index f5c460fca00..b9ee5965b1d 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -385,6 +385,8 @@ Usage of ./cmd/mimir/mimir: Print basic help. -help-all Print help, also including advanced and experimental parameters. + -ingester.circuit-breaker.push-timeout duration + How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors -ingester.max-global-metadata-per-metric int The maximum number of metadata per metric, across the cluster. 0 to disable. -ingester.max-global-metadata-per-user int diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 409e1e14ac8..e2b5b386a62 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -115,12 +115,20 @@ The following features are currently experimental: - `-ingester.track-ingester-owned-series` - `-ingester.use-ingester-owned-series-for-limits` - `-ingester.owned-series-update-interval` + - Per-ingester circuit breaking based on requests timing out or hitting per-instance limits + - `-ingester.circuit-breaker.enabled` + - `-ingester.circuit-breaker.failure-threshold` + - `-ingester.circuit-breaker.failure-execution-threshold` + - `-ingester.circuit-breaker.thresholding-period` + - `-ingester.circuit-breaker.cooldown-period` + - `-ingester.circuit-breaker.initial-delay` + - `-ingester.circuit-breaker.push-timeout` - Ingester client - Per-ingester circuit breaking based on requests timing out or hitting per-instance limits - `-ingester.client.circuit-breaker.enabled` - `-ingester.client.circuit-breaker.failure-threshold` - `-ingester.client.circuit-breaker.failure-execution-threshold` - - `-ingester.client.circuit-breaker.period` + - `-ingester.client.circuit-breaker.thresholding-period` - `-ingester.client.circuit-breaker.cooldown-period` - Querier - Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`) diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 03c2e14cb60..5674155ceb9 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1212,6 +1212,44 @@ instance_limits: # owned series as a result of detected change. # CLI flag: -ingester.owned-series-update-interval [owned_series_update_interval: | default = 15s] + +circuit_breaker: + # (experimental) Enable circuit breaking when making requests to ingesters + # CLI flag: -ingester.circuit-breaker.enabled + [enabled: | default = false] + + # (experimental) Max percentage of requests that can fail over period before + # the circuit breaker opens + # CLI flag: -ingester.circuit-breaker.failure-threshold + [failure_threshold: | default = 10] + + # (experimental) How many requests must have been executed in period for the + # circuit breaker to be eligible to open for the rate of failures + # CLI flag: -ingester.circuit-breaker.failure-execution-threshold + [failure_execution_threshold: | default = 100] + + # (experimental) Moving window of time that the percentage of failed requests + # is computed over + # CLI flag: -ingester.circuit-breaker.thresholding-period + [thresholding_period: | default = 1m] + + # (experimental) How long the circuit breaker will stay in the open state + # before allowing some requests + # CLI flag: -ingester.circuit-breaker.cooldown-period + [cooldown_period: | default = 10s] + + # (experimental) How long the circuit breaker should wait between creation and + # starting up. During that time both failures and successes will not be + # counted. + # CLI flag: -ingester.circuit-breaker.initial-delay + [initial_delay: | default = 0s] + + # (experiment) How long is execution of ingester's Push supposed to last + # before it is reported as timeout in a circuit breaker. This configuration is + # used for circuit breakers only, and timeout expirations are not reported as + # errors + # CLI flag: -ingester.circuit-breaker.push-timeout + [push_timeout: | default = 0s] ``` ### querier diff --git a/pkg/ingester/circuitbreaker.go b/pkg/ingester/circuitbreaker.go new file mode 100644 index 00000000000..852dacca54e --- /dev/null +++ b/pkg/ingester/circuitbreaker.go @@ -0,0 +1,227 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package ingester + +import ( + "context" + "flag" + "fmt" + "time" + + "github.com/failsafe-go/failsafe-go" + "github.com/failsafe-go/failsafe-go/circuitbreaker" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/middleware" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +type testCtxKey string + +const ( + resultSuccess = "success" + resultError = "error" + resultOpen = "circuit_breaker_open" + defaultTimeout = 2 * time.Second + testDelayKey testCtxKey = "test-delay" +) + +type CircuitBreakerConfig struct { + Enabled bool `yaml:"enabled" category:"experimental"` + FailureThreshold uint `yaml:"failure_threshold" category:"experimental"` + FailureExecutionThreshold uint `yaml:"failure_execution_threshold" category:"experimental"` + ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"` + CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"` + InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"` + PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"` + testModeEnabled bool `yaml:"-"` +} + +func (cfg *CircuitBreakerConfig) RegisterFlags(f *flag.FlagSet) { + prefix := "ingester.circuit-breaker." + f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Enable circuit breaking when making requests to ingesters") + f.UintVar(&cfg.FailureThreshold, prefix+"failure-threshold", 10, "Max percentage of requests that can fail over period before the circuit breaker opens") + f.UintVar(&cfg.FailureExecutionThreshold, prefix+"failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures") + f.DurationVar(&cfg.ThresholdingPeriod, prefix+"thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over") + f.DurationVar(&cfg.CooldownPeriod, prefix+"cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests") + f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait between creation and starting up. During that time both failures and successes will not be counted.") + f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", 0, "How long is execution of ingester's Push supposed to last before it is reported as timeout in a circuit breaker. This configuration is used for circuit breakers only, and timeout expirations are not reported as errors") +} + +func (cfg *CircuitBreakerConfig) Validate() error { + return nil +} + +type circuitBreaker struct { + circuitbreaker.CircuitBreaker[any] + ingester *Ingester + executor failsafe.Executor[any] + startTime time.Time +} + +func newCircuitBreaker(ingester *Ingester) *circuitBreaker { + ingesterID := ingester.cfg.IngesterRing.InstanceID + cfg := ingester.cfg.CircuitBreakerConfig + metrics := ingester.metrics + // Initialize each of the known labels for circuit breaker metrics for this particular ingester. + transitionOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.OpenState.String()) + transitionHalfOpen := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.HalfOpenState.String()) + transitionClosed := metrics.circuitBreakerTransitions.WithLabelValues(ingesterID, circuitbreaker.ClosedState.String()) + countSuccess := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultSuccess) + countError := metrics.circuitBreakerResults.WithLabelValues(ingesterID, resultError) + + cbBuilder := circuitbreaker.Builder[any](). + WithFailureThreshold(cfg.FailureThreshold). + WithDelay(cfg.CooldownPeriod). + OnFailure(func(failsafe.ExecutionEvent[any]) { + countError.Inc() + }). + OnSuccess(func(failsafe.ExecutionEvent[any]) { + countSuccess.Inc() + }). + OnClose(func(event circuitbreaker.StateChangedEvent) { + transitionClosed.Inc() + level.Info(ingester.logger).Log("msg", "circuit breaker is closed", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + }). + OnOpen(func(event circuitbreaker.StateChangedEvent) { + transitionOpen.Inc() + level.Info(ingester.logger).Log("msg", "circuit breaker is open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + }). + OnHalfOpen(func(event circuitbreaker.StateChangedEvent) { + transitionHalfOpen.Inc() + level.Info(ingester.logger).Log("msg", "circuit breaker is half-open", "ingester", ingesterID, "previous", event.OldState, "current", event.NewState) + }). + HandleIf(func(_ any, err error) bool { return isFailure(err) }) + + if cfg.testModeEnabled { + // In case of testing purposes, we initialize the circuit breaker with count based failure thresholding, + // since it is more deterministic, and therefore it is easier to predict the outcome. + cbBuilder = cbBuilder.WithFailureThreshold(cfg.FailureThreshold) + } else { + // In case of production code, we prefer time based failure thresholding. + cbBuilder = cbBuilder.WithFailureRateThreshold(cfg.FailureThreshold, cfg.FailureExecutionThreshold, cfg.ThresholdingPeriod) + } + + cb := cbBuilder.Build() + return &circuitBreaker{ + CircuitBreaker: cb, + ingester: ingester, + executor: failsafe.NewExecutor[any](cb), + startTime: time.Now().Add(cfg.InitialDelay), + } +} + +func isFailure(err error) bool { + if err == nil { + return false + } + + // We only consider timeouts or ingester hitting a per-instance limit + // to be errors worthy of tripping the circuit breaker since these + // are specific to a particular ingester, not a user or request. + + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + var ingesterErr ingesterError + if errors.As(err, &ingesterErr) { + return ingesterErr.errorCause() == mimirpb.INSTANCE_LIMIT + } + + return false +} + +func (cb *circuitBreaker) isActive() bool { + return cb.startTime.Before(time.Now()) +} + +func (cb *circuitBreaker) ingesterID() string { + return cb.ingester.cfg.IngesterRing.InstanceID +} + +func (cb *circuitBreaker) logger() log.Logger { + return cb.ingester.logger +} + +func (cb *circuitBreaker) metrics() *ingesterMetrics { + return cb.ingester.metrics +} + +func (cb *circuitBreaker) config() CircuitBreakerConfig { + return cb.ingester.cfg.CircuitBreakerConfig +} + +func (cb *circuitBreaker) get(ctx context.Context, callbackFnName string, callbackFn func() (any, error)) (any, error) { + res, err := cb.executor.Get(callbackFn) + if err != nil && errors.Is(err, circuitbreaker.ErrOpen) { + cb.metrics().circuitBreakerResults.WithLabelValues(cb.ingesterID(), resultOpen).Inc() + cbOpenErr := middleware.DoNotLogError{Err: newCircuitBreakerOpenError(cb.RemainingDelay())} + return res, newErrorWithStatus(cbOpenErr, codes.Unavailable) + } + return res, cb.processError(ctx, err, callbackFnName) +} + +func (cb *circuitBreaker) processError(ctx context.Context, err error, callbackFnName string) error { + if err == nil { + return nil + } + if errors.Is(err, ctx.Err()) { + level.Error(cb.logger()).Log("msg", fmt.Sprintf("callback function %s completed with an error found in the context", callbackFnName), "ingester", cb.ingesterID(), "ctxErr", ctx.Err()) + + // ctx.Err() was registered with the circuit breaker's executor, but we don't propagate it + return nil + } + + level.Error(cb.logger()).Log("msg", fmt.Sprintf("callback function %s completed with an error", callbackFnName), "ingester", cb.ingesterID(), "err", err) + return err +} + +func (cb *circuitBreaker) contextWithTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout == 0 { + timeout = defaultTimeout + } + ctx, cancel := context.WithTimeout(context.WithoutCancel(parent), timeout) + if cb.config().testModeEnabled { + if initialDelay, ok := parent.Value(testDelayKey).(string); ok { + if d, err := time.ParseDuration(initialDelay); err == nil { + time.Sleep(d) + } + } + } + return ctx, cancel +} + +func (cb *circuitBreaker) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { + callbackCtx, callbackErr := cb.get(ctx, "ingester.startPushRequest", func() (any, error) { + callbackCtx, _, callbackErr := cb.ingester.startPushRequest(ctx, reqSize) + return callbackCtx, callbackErr + }) + if callbackErr == nil { + return callbackCtx.(context.Context), nil + } + return nil, callbackErr +} + +func (cb *circuitBreaker) Push(parent context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { + ctx, cancel := cb.contextWithTimeout(parent, cb.config().PushTimeout) + defer cancel() + + callbackResult, callbackErr := cb.get(ctx, "ingester.push", func() (any, error) { + callbackResult, callbackErr := cb.ingester.push(ctx, req) + if callbackErr != nil { + return callbackResult, callbackErr + } + + // We return ctx.Err() in order to register it with the circuit breaker's executor. + return callbackResult, ctx.Err() + }) + + if callbackResult == nil { + return nil, callbackErr + } + return callbackResult.(*mimirpb.WriteResponse), callbackErr +} diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go new file mode 100644 index 00000000000..aec3b3acdf3 --- /dev/null +++ b/pkg/ingester/circuitbreaker_test.go @@ -0,0 +1,193 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package ingester + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/grafana/dskit/grpcutil" + "github.com/grafana/dskit/middleware" + "github.com/grafana/dskit/services" + "github.com/grafana/dskit/test" + "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +func TestIngester_Push_CircuitBreaker(t *testing.T) { + pushTimeout := 100 * time.Millisecond + tests := map[string]struct { + expectedErrorWhenCircuitBreakerClosed error + ctx func(context.Context) context.Context + limits InstanceLimits + }{ + "deadline exceeded": { + expectedErrorWhenCircuitBreakerClosed: nil, + limits: InstanceLimits{MaxInMemoryTenants: 3}, + ctx: func(ctx context.Context) context.Context { + return context.WithValue(ctx, testDelayKey, (2 * pushTimeout).String()) + }, + }, + "instance limit hit": { + expectedErrorWhenCircuitBreakerClosed: instanceLimitReachedError{}, + limits: InstanceLimits{MaxInMemoryTenants: 1}, + }, + } + + for initialDelayEnabled, initialDelayStatus := range map[bool]string{false: "disabled", true: "enabled"} { + for testName, testCase := range tests { + t.Run(fmt.Sprintf("%s with initial delay %s", testName, initialDelayStatus), func(t *testing.T) { + metricLabelAdapters := [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "test"}}} + metricNames := []string{ + "cortex_ingester_circuit_breaker_results_total", + "cortex_ingester_circuit_breaker_transitions_total", + } + + registry := prometheus.NewRegistry() + + // Create a mocked ingester + cfg := defaultIngesterTestConfig(t) + cfg.ActiveSeriesMetrics.IdleTimeout = 100 * time.Millisecond + cfg.InstanceLimitsFn = func() *InstanceLimits { + return &testCase.limits + } + failureThreshold := 2 + var initialDelay time.Duration + if initialDelayEnabled { + initialDelay = 200 * time.Millisecond + } + cfg.CircuitBreakerConfig = CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: uint(failureThreshold), + CooldownPeriod: 10 * time.Second, + InitialDelay: initialDelay, + PushTimeout: pushTimeout, + testModeEnabled: true, + } + + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is healthy + test.Poll(t, 100*time.Millisecond, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) + + // the first request is successful + ctx := user.InjectOrgID(context.Background(), "test-0") + req := mimirpb.ToWriteRequest( + metricLabelAdapters, + []mimirpb.Sample{{Value: 1, TimestampMs: 8}}, + nil, + nil, + mimirpb.API, + ) + _, err = i.Push(ctx, req) + require.NoError(t, err) + + count := 0 + + // Push timeseries for each user + for _, userID := range []string{"test-1", "test-2"} { + reqs := []*mimirpb.WriteRequest{ + mimirpb.ToWriteRequest( + metricLabelAdapters, + []mimirpb.Sample{{Value: 1, TimestampMs: 9}}, + nil, + nil, + mimirpb.API, + ), + mimirpb.ToWriteRequest( + metricLabelAdapters, + []mimirpb.Sample{{Value: 2, TimestampMs: 10}}, + nil, + nil, + mimirpb.API, + ), + } + + for _, req := range reqs { + ctx := user.InjectOrgID(context.Background(), userID) + count++ + if testCase.ctx != nil { + ctx = testCase.ctx(ctx) + } + _, err = i.Push(ctx, req) + if initialDelayEnabled { + if testCase.expectedErrorWhenCircuitBreakerClosed != nil { + require.ErrorAs(t, err, &testCase.expectedErrorWhenCircuitBreakerClosed) + } else { + require.NoError(t, err) + } + } else { + if count <= failureThreshold { + if testCase.expectedErrorWhenCircuitBreakerClosed != nil { + require.ErrorAs(t, err, &testCase.expectedErrorWhenCircuitBreakerClosed) + } + } else { + checkCircuitBreakerOpenErr(ctx, err, t) + } + } + } + } + + // Check tracked Prometheus metrics + var expectedMetrics string + if initialDelayEnabled { + expectedMetrics = ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 0 + ` + } else { + expectedMetrics = ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="circuit_breaker_open"} 2 + cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="error"} 2 + cortex_ingester_circuit_breaker_results_total{ingester="localhost",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{ingester="localhost",state="open"} 1 + ` + } + assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) + }) + } + } +} + +func checkCircuitBreakerOpenErr(ctx context.Context, err error, t *testing.T) { + var cbOpenErr circuitBreakerOpenError + require.ErrorAs(t, err, &cbOpenErr) + + var optional middleware.OptionalLogging + require.ErrorAs(t, err, &optional) + + shouldLog, _ := optional.ShouldLog(ctx) + require.False(t, shouldLog, "expected not to log via .ShouldLog()") + + s, ok := grpcutil.ErrorToStatus(err) + require.True(t, ok, "expected to be able to convert to gRPC status") + require.Equal(t, codes.Unavailable, s.Code()) +} diff --git a/pkg/ingester/errors.go b/pkg/ingester/errors.go index 56186be67ae..c39b00d0c2a 100644 --- a/pkg/ingester/errors.go +++ b/pkg/ingester/errors.go @@ -11,6 +11,7 @@ import ( "net/http" "time" + "github.com/failsafe-go/failsafe-go/circuitbreaker" "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" @@ -489,6 +490,24 @@ func (e ingesterPushGrpcDisabledError) errorCause() mimirpb.ErrorCause { // Ensure that ingesterPushGrpcDisabledError is an ingesterError. var _ ingesterError = ingesterPushGrpcDisabledError{} +type circuitBreakerOpenError struct { + remainingDelay time.Duration +} + +func newCircuitBreakerOpenError(remainingDelay time.Duration) circuitBreakerOpenError { + return circuitBreakerOpenError{remainingDelay: remainingDelay} +} + +func (e circuitBreakerOpenError) Error() string { + return fmt.Sprintf("%s with remaining delay %s", circuitbreaker.ErrOpen.Error(), e.remainingDelay.String()) +} + +func (e circuitBreakerOpenError) errorCause() mimirpb.ErrorCause { + return mimirpb.CIRCUIT_BREAKER_OPEN +} + +var _ ingesterError = circuitBreakerOpenError{} + type ingesterErrSamplers struct { sampleTimestampTooOld *log.Sampler sampleTimestampTooOldOOOEnabled *log.Sampler diff --git a/pkg/ingester/errors_test.go b/pkg/ingester/errors_test.go index 6029e2e2734..b56f565ee75 100644 --- a/pkg/ingester/errors_test.go +++ b/pkg/ingester/errors_test.go @@ -246,6 +246,24 @@ func TestTooBusyError(t *testing.T) { checkIngesterError(t, wrappedErr, mimirpb.TOO_BUSY, false) } +func TestNewCircuitBreakerOpenError(t *testing.T) { + remainingDelay := 1 * time.Second + expectedMsg := fmt.Sprintf("circuit breaker open with remaining delay %s", remainingDelay.String()) + err := newCircuitBreakerOpenError(remainingDelay) + require.Error(t, err) + require.EqualError(t, err, expectedMsg) + checkIngesterError(t, err, mimirpb.CIRCUIT_BREAKER_OPEN, false) + + wrappedErr := fmt.Errorf("wrapped: %w", err) + require.ErrorIs(t, wrappedErr, err) + require.ErrorAs(t, wrappedErr, &circuitBreakerOpenError{}) + + wrappedWithUserErr := wrapOrAnnotateWithUser(err, userID) + require.ErrorIs(t, wrappedWithUserErr, err) + require.ErrorAs(t, wrappedWithUserErr, &circuitBreakerOpenError{}) + checkIngesterError(t, wrappedErr, mimirpb.CIRCUIT_BREAKER_OPEN, false) +} + func TestNewErrorWithStatus(t *testing.T) { errMsg := "this is an error" ingesterErr := mockIngesterErr(errMsg) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index bb3185d8c50..d62269b8323 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -215,6 +215,8 @@ type Config struct { // This config can be overridden in tests. limitMetricsUpdatePeriod time.Duration `yaml:"-"` + + CircuitBreakerConfig CircuitBreakerConfig `yaml:"circuit_breaker" category:"experimental"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -223,6 +225,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { cfg.IngesterPartitionRing.RegisterFlags(f) cfg.DefaultLimits.RegisterFlags(f) cfg.ActiveSeriesMetrics.RegisterFlags(f) + cfg.CircuitBreakerConfig.RegisterFlags(f) f.DurationVar(&cfg.MetadataRetainPeriod, "ingester.metadata-retain-period", 10*time.Minute, "Period at which metadata we have not seen will remain in memory before being deleted.") f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-tenant ingestion rates.") @@ -258,7 +261,13 @@ func (cfg *Config) Validate(logger log.Logger) error { util.WarnDeprecatedConfig(deprecatedReturnOnlyGRPCErrorsFlag, logger) } - return cfg.IngesterRing.Validate() + err := cfg.IngesterRing.Validate() + + if err == nil { + return cfg.CircuitBreakerConfig.Validate() + } + + return err } func (cfg *Config) getIgnoreSeriesLimitForMetricNamesMap() map[string]struct{} { @@ -349,6 +358,8 @@ type Ingester struct { ingestReader *ingest.PartitionReader ingestPartitionID int32 ingestPartitionLifecycler *ring.PartitionInstanceLifecycler + + circuitBreaker *circuitBreaker } func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { @@ -393,6 +404,10 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService + if cfg.CircuitBreakerConfig.Enabled { + i.circuitBreaker = newCircuitBreaker(i) + } + if registerer != nil { promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_ingester_oldest_unshipped_block_timestamp_seconds", @@ -959,6 +974,9 @@ type pushRequestState struct { // StartPushRequest checks if ingester can start push request, and increments relevant counters. // If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated. func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error) { + if i.isCircuitBreakerActive() { + return i.circuitBreaker.StartPushRequest(ctx, reqSize) + } ctx, _, err := i.startPushRequest(ctx, reqSize) return ctx, err } @@ -3748,8 +3766,22 @@ func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) return nil } +func (i *Ingester) isCircuitBreakerActive() bool { + if i.circuitBreaker == nil { + return false + } + return i.circuitBreaker.isActive() +} + // Push implements client.IngesterServer, which is registered into gRPC server. func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { + if i.isCircuitBreakerActive() { + return i.circuitBreaker.Push(ctx, req) + } + return i.push(ctx, req) +} + +func (i *Ingester) push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { if !i.cfg.PushGrpcMethodEnabled { return nil, errPushGrpcDisabled } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 833a802fb04..d756d4c86e9 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -83,6 +83,9 @@ type ingesterMetrics struct { // Count number of requests rejected due to utilization based limiting. utilizationLimitedRequests *prometheus.CounterVec + + circuitBreakerTransitions *prometheus.CounterVec + circuitBreakerResults *prometheus.CounterVec } func newIngesterMetrics( @@ -375,6 +378,15 @@ func newIngesterMetrics( Name: "cortex_ingester_prepare_shutdown_requested", Help: "If the ingester has been requested to prepare for shutdown via endpoint or marker file.", }), + + circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_circuit_breaker_transitions_total", + Help: "Number of times the circuit breaker has entered a state", + }, []string{"ingester", "state"}), + circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_circuit_breaker_results_total", + Help: "Results of executing requests via the circuit breaker", + }, []string{"ingester", "result"}), } // Initialize expected rejected request labels