diff --git a/CHANGELOG.md b/CHANGELOG.md index 0371f5bbe68..e2dbb46a4f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * `http.StatusServiceUnavailable` (503) and `codes.Unknown` are replaced with `codes.Internal`. * [CHANGE] Upgrade Node.js to v20. #6540 * [CHANGE] Querier: `cortex_querier_blocks_consistency_checks_failed_total` is now incremented when a block couldn't be queried from any attempted store-gateway as opposed to incremented after each attempt. Also `cortex_querier_blocks_consistency_checks_total` is incremented once per query as opposed to once per attempt (with 3 attempts). #6590 +* [FEATURE] Distributor: added option `-distributor.retry-after-header.enabled` to include the `Retry-After` header in recoverable error responses. #6608 * [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609 * [FEATURE] Vault: Added support for new Vault authentication methods: `AppRole`, `Kubernetes`, `UserPass` and `Token`. #6143 * [FEATURE] Add experimental endpoint `/api/v1/cardinality/active_series` to return the set of active series for a given selector. #6536 #6619 #6651 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index f0000f62d0d..7ed62808c12 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -737,6 +737,49 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "block", + "name": "retry_after_header", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "enabled", + "required": false, + "desc": "Enabled controls inclusion of the Retry-After header in the response: true includes it for client retry guidance, false omits it.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "distributor.retry-after-header.enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "base_seconds", + "required": false, + "desc": "Base duration in seconds for calculating the Retry-After header in responses to 429/5xx errors.", + "fieldValue": null, + "fieldDefaultValue": 3, + "fieldFlag": "distributor.retry-after-header.base-seconds", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "max_backoff_exponent", + "required": false, + "desc": "Sets the upper limit on the number of Retry-Attempt considered for calculation. It caps the Retry-Attempt header without rejecting additional attempts, controlling exponential backoff calculations. For example, when the base-seconds is set to 3 and max-backoff-exponent to 5, the maximum retry duration would be 3 * 2^5 = 96 seconds.", + "fieldValue": null, + "fieldDefaultValue": 5, + "fieldFlag": "distributor.retry-after-header.max-backoff-exponent", + "fieldType": "int", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null + }, { "kind": "block", "name": "ha_tracker", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 955fe26983d..5f67c65f15e 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1151,6 +1151,12 @@ Usage of ./cmd/mimir/mimir: Per-tenant allowed push request burst size. 0 to disable. -distributor.request-rate-limit float Per-tenant push request rate limit in requests per second. 0 to disable. + -distributor.retry-after-header.base-seconds int + [experimental] Base duration in seconds for calculating the Retry-After header in responses to 429/5xx errors. (default 3) + -distributor.retry-after-header.enabled + [experimental] Enabled controls inclusion of the Retry-After header in the response: true includes it for client retry guidance, false omits it. + -distributor.retry-after-header.max-backoff-exponent int + [experimental] Sets the upper limit on the number of Retry-Attempt considered for calculation. It caps the Retry-Attempt header without rejecting additional attempts, controlling exponential backoff calculations. For example, when the base-seconds is set to 3 and max-backoff-exponent to 5, the maximum retry duration would be 3 * 2^5 = 96 seconds. (default 5) -distributor.ring.consul.acl-token string ACL Token used to interact with Consul. -distributor.ring.consul.cas-retry-delay duration diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 0934e9e507c..5d37824c426 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -62,6 +62,10 @@ The following features are currently experimental: - `-distributor.enable-otlp-metadata-storage` - Using status code 529 instead of 429 upon rate limit exhaustion. - `distributor.service-overload-status-code-on-rate-limit-enabled` + - Set Retry-After header in recoverable error responses + - `-distributor.retry-after-header.enabled` + - `-distributor.retry-after-header.base-seconds` + - `-distributor.retry-after-header.max-backoff-exponent` - Hash ring - Disabling ring heartbeat timeouts - `-distributor.ring.heartbeat-timeout=0` diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index d904fe5e0cc..e5e69515c19 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -733,6 +733,25 @@ pool: # CLI flag: -distributor.health-check-ingesters [health_check_ingesters: | default = true] +retry_after_header: + # (experimental) Enabled controls inclusion of the Retry-After header in the + # response: true includes it for client retry guidance, false omits it. + # CLI flag: -distributor.retry-after-header.enabled + [enabled: | default = false] + + # (experimental) Base duration in seconds for calculating the Retry-After + # header in responses to 429/5xx errors. + # CLI flag: -distributor.retry-after-header.base-seconds + [base_seconds: | default = 3] + + # (experimental) Sets the upper limit on the number of Retry-Attempt + # considered for calculation. It caps the Retry-Attempt header without + # rejecting additional attempts, controlling exponential backoff calculations. + # For example, when the base-seconds is set to 3 and max-backoff-exponent to + # 5, the maximum retry duration would be 3 * 2^5 = 96 seconds. + # CLI flag: -distributor.retry-after-header.max-backoff-exponent + [max_backoff_exponent: | default = 5] + ha_tracker: # Enable the distributors HA tracker so that it can accept samples from # Prometheus HA replicas gracefully (requires labels). diff --git a/pkg/api/api.go b/pkg/api/api.go index 005cfbdb51b..2cbf7c29da3 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -234,8 +234,8 @@ const OTLPPushEndpoint = "/otlp/v1/metrics" func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, d.PushWithMiddlewares), true, false, "POST") - a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, reg, d.PushWithMiddlewares), true, false, "POST") + a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares), true, false, "POST") + a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, reg, d.PushWithMiddlewares), true, false, "POST") a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{ {Desc: "Ring status", Path: "/distributor/ring"}, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d234a95404a..76532a239ff 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -157,6 +157,7 @@ type Distributor struct { type Config struct { PoolConfig PoolConfig `yaml:"pool"` + RetryConfig RetryConfig `yaml:"retry_after_header"` HATrackerConfig HATrackerConfig `yaml:"ha_tracker"` MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"` @@ -200,6 +201,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { cfg.PoolConfig.RegisterFlags(f) cfg.HATrackerConfig.RegisterFlags(f) cfg.DistributorRing.RegisterFlags(f, logger) + cfg.RetryConfig.RegisterFlags(f) f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.") f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") @@ -215,7 +217,10 @@ func (cfg *Config) Validate(limits validation.Limits) error { return errInvalidTenantShardSize } - return cfg.HATrackerConfig.Validate() + if err := cfg.HATrackerConfig.Validate(); err != nil { + return err + } + return cfg.RetryConfig.Validate() } const ( diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index e8674e21abd..f06e134e0b5 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -46,12 +46,13 @@ func OTLPHandler( allowSkipLabelNameValidation bool, enableOtelMetadataStorage bool, limits *validation.Overrides, + retryCfg RetryConfig, reg prometheus.Registerer, push PushFunc, ) http.Handler { discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError) - return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) { + return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) { var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error) logger := log.WithContext(ctx, log.Logger) diff --git a/pkg/distributor/push.go b/pkg/distributor/push.go index 9a4e7bafff9..f587e44a389 100644 --- a/pkg/distributor/push.go +++ b/pkg/distributor/push.go @@ -8,8 +8,11 @@ package distributor import ( "context" "errors" + "flag" "fmt" + "math/rand" "net/http" + "strconv" "sync" "github.com/go-kit/log/level" @@ -17,6 +20,7 @@ import ( "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/tenant" + "github.com/opentracing/opentracing-go" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util" @@ -36,24 +40,52 @@ type bufHolder struct { buf []byte } -var bufferPool = sync.Pool{ - New: func() interface{} { return &bufHolder{buf: make([]byte, 256*1024)} }, -} +var ( + bufferPool = sync.Pool{ + New: func() interface{} { return &bufHolder{buf: make([]byte, 256*1024)} }, + } + errRetryBaseLessThanOneSecond = errors.New("retry base duration should not be less than 1 second") + errNonPositiveMaxBackoffExponent = errors.New("max backoff exponent should be a positive value") +) const ( SkipLabelNameValidationHeader = "X-Mimir-SkipLabelNameValidation" statusClientClosedRequest = 499 ) +type RetryConfig struct { + Enabled bool `yaml:"enabled" category:"experimental"` + BaseSeconds int `yaml:"base_seconds" category:"experimental"` + MaxBackoffExponent int `yaml:"max_backoff_exponent" category:"experimental"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *RetryConfig) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, "distributor.retry-after-header.enabled", false, "Enabled controls inclusion of the Retry-After header in the response: true includes it for client retry guidance, false omits it.") + f.IntVar(&cfg.BaseSeconds, "distributor.retry-after-header.base-seconds", 3, "Base duration in seconds for calculating the Retry-After header in responses to 429/5xx errors.") + f.IntVar(&cfg.MaxBackoffExponent, "distributor.retry-after-header.max-backoff-exponent", 5, "Sets the upper limit on the number of Retry-Attempt considered for calculation. It caps the Retry-Attempt header without rejecting additional attempts, controlling exponential backoff calculations. For example, when the base-seconds is set to 3 and max-backoff-exponent to 5, the maximum retry duration would be 3 * 2^5 = 96 seconds.") +} + +func (cfg *RetryConfig) Validate() error { + if cfg.BaseSeconds < 1 { + return errRetryBaseLessThanOneSecond + } + if cfg.MaxBackoffExponent < 1 { + return errNonPositiveMaxBackoffExponent + } + return nil +} + // Handler is a http.Handler which accepts WriteRequests. func Handler( maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, allowSkipLabelNameValidation bool, limits *validation.Overrides, + retryCfg RetryConfig, push PushFunc, ) http.Handler { - return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) { + return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) { res, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, dst, req, util.RawSnappy) if errors.Is(err, util.MsgSizeTooLargeErr{}) { err = distributorMaxWriteMessageSizeErr{actual: int(r.ContentLength), limit: maxRecvMsgSize} @@ -79,6 +111,7 @@ func handler( sourceIPs *middleware.SourceIPExtractor, allowSkipLabelNameValidation bool, limits *validation.Overrides, + retryCfg RetryConfig, push PushFunc, parser parserFunc, ) http.Handler { @@ -141,12 +174,30 @@ func handler( if code != 202 { level.Error(logger).Log("msg", "push error", "err", err) } - addHeaders(w, err) + addHeaders(w, err, r, code, retryCfg) http.Error(w, msg, code) } }) } +func calculateRetryAfter(retryAttemptHeader string, baseSeconds int, maxBackoffExponent int) string { + retryAttempt, err := strconv.Atoi(retryAttemptHeader) + // If retry-attempt is not valid, set it to default 1 + if err != nil || retryAttempt < 1 { + retryAttempt = 1 + } + if retryAttempt > maxBackoffExponent { + retryAttempt = maxBackoffExponent + } + var minSeconds, maxSeconds int64 + minSeconds = int64(baseSeconds) << (retryAttempt - 1) + maxSeconds = int64(minSeconds) << 1 + + delaySeconds := minSeconds + rand.Int63n(maxSeconds-minSeconds) + + return strconv.FormatInt(delaySeconds, 10) +} + // toHTTPStatus converts the given error into an appropriate HTTP status corresponding // to that error, if the error is one of the errors from this package. Otherwise, an // http.StatusInternalServerError is returned. @@ -185,9 +236,21 @@ func toHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrid return http.StatusInternalServerError } -func addHeaders(w http.ResponseWriter, err error) { +func addHeaders(w http.ResponseWriter, err error, r *http.Request, responseCode int, retryCfg RetryConfig) { var doNotLogError middleware.DoNotLogError if errors.As(err, &doNotLogError) { w.Header().Set(server.DoNotLogErrorHeaderKey, "true") } + + if responseCode == http.StatusTooManyRequests || responseCode/100 == 5 { + if retryCfg.Enabled { + retryAttemptHeader := r.Header.Get("Retry-Attempt") + retrySeconds := calculateRetryAfter(retryAttemptHeader, retryCfg.BaseSeconds, retryCfg.MaxBackoffExponent) + w.Header().Set("Retry-After", retrySeconds) + if sp := opentracing.SpanFromContext(r.Context()); sp != nil { + sp.SetTag("retry-after", retrySeconds) + sp.SetTag("retry-attempt", retryAttemptHeader) + } + } + } } diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 469eafea2c5..1e859572f08 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -14,10 +14,12 @@ import ( "io" "net/http" "net/http/httptest" + "strconv" "testing" "time" "github.com/golang/snappy" + "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/middleware" @@ -41,7 +43,7 @@ import ( func TestHandler_remoteWrite(t *testing.T) { req := createRequest(t, createPrometheusRemoteWriteProtobuf(t)) resp := httptest.NewRecorder() - handler := Handler(100000, nil, false, nil, verifyWritePushFunc(t, mimirpb.API)) + handler := Handler(100000, nil, false, nil, RetryConfig{}, verifyWritePushFunc(t, mimirpb.API)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -265,7 +267,7 @@ func TestHandlerOTLPPush(t *testing.T) { req.Header.Set("Content-Encoding", tt.encoding) } - handler := OTLPHandler(tt.maxMsgSize, nil, false, tt.enableOtelMetadataStorage, nil, nil, tt.verifyFunc) + handler := OTLPHandler(tt.maxMsgSize, nil, false, tt.enableOtelMetadataStorage, nil, RetryConfig{}, nil, tt.verifyFunc) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) @@ -320,7 +322,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) { req := createOTLPRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), false) resp := httptest.NewRecorder() - handler := OTLPHandler(100000, nil, false, true, nil, nil, func(ctx context.Context, pushReq *Request) error { + handler := OTLPHandler(100000, nil, false, true, nil, RetryConfig{}, nil, func(ctx context.Context, pushReq *Request) error { request, err := pushReq.WriteRequest() assert.NoError(t, err) assert.Len(t, request.Timeseries, 3) @@ -360,7 +362,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { req := createOTLPRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), false) resp := httptest.NewRecorder() - handler := OTLPHandler(100000, nil, false, true, nil, nil, func(ctx context.Context, pushReq *Request) error { + handler := OTLPHandler(100000, nil, false, true, nil, RetryConfig{}, nil, func(ctx context.Context, pushReq *Request) error { request, err := pushReq.WriteRequest() assert.NoError(t, err) assert.Len(t, request.Timeseries, 2) @@ -386,7 +388,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { req = createOTLPRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), false) resp = httptest.NewRecorder() - handler = OTLPHandler(100000, nil, false, true, nil, nil, func(ctx context.Context, pushReq *Request) error { + handler = OTLPHandler(100000, nil, false, true, nil, RetryConfig{}, nil, func(ctx context.Context, pushReq *Request) error { request, err := pushReq.WriteRequest() assert.NoError(t, err) assert.Len(t, request.Timeseries, 10) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case @@ -415,7 +417,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) { resp := httptest.NewRecorder() - handler := OTLPHandler(140, nil, false, true, nil, nil, readBodyPushFunc(t)) + handler := OTLPHandler(140, nil, false, true, nil, RetryConfig{}, nil, readBodyPushFunc(t)) handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code) body, err := io.ReadAll(resp.Body) @@ -427,7 +429,7 @@ func TestHandler_mimirWriteRequest(t *testing.T) { req := createRequest(t, createMimirWriteRequestProtobuf(t, false)) resp := httptest.NewRecorder() sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(100000, sourceIPs, false, nil, verifyWritePushFunc(t, mimirpb.RULE)) + handler := Handler(100000, sourceIPs, false, nil, RetryConfig{}, verifyWritePushFunc(t, mimirpb.RULE)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -436,7 +438,7 @@ func TestHandler_contextCanceledRequest(t *testing.T) { req := createRequest(t, createMimirWriteRequestProtobuf(t, false)) resp := httptest.NewRecorder() sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(100000, sourceIPs, false, nil, func(_ context.Context, req *Request) error { + handler := Handler(100000, sourceIPs, false, nil, RetryConfig{}, func(_ context.Context, req *Request) error { defer req.CleanUp() return fmt.Errorf("the request failed: %w", context.Canceled) }) @@ -545,7 +547,7 @@ func TestHandler_EnsureSkipLabelNameValidationBehaviour(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { resp := httptest.NewRecorder() - handler := Handler(100000, nil, tc.allowSkipLabelNameValidation, nil, tc.verifyReqHandler) + handler := Handler(100000, nil, tc.allowSkipLabelNameValidation, nil, RetryConfig{}, tc.verifyReqHandler) if !tc.includeAllowSkiplabelNameValidationHeader { tc.req.Header.Set(SkipLabelNameValidationHeader, "true") } @@ -702,7 +704,7 @@ func BenchmarkPushHandler(b *testing.B) { pushReq.CleanUp() return nil } - handler := Handler(100000, nil, false, nil, pushFunc) + handler := Handler(100000, nil, false, nil, RetryConfig{}, pushFunc) b.ResetTimer() for iter := 0; iter < b.N; iter++ { req.Body = bufCloser{Buffer: buf} // reset Body so it can be read each time round the loop @@ -758,7 +760,7 @@ func TestHandler_ErrorTranslation(t *testing.T) { return err } - h := handler(10, nil, false, nil, pushFunc, parserFunc) + h := handler(10, nil, false, nil, RetryConfig{}, pushFunc, parserFunc) recorder := httptest.NewRecorder() h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}})) @@ -827,9 +829,7 @@ func TestHandler_ErrorTranslation(t *testing.T) { } return tc.err } - - h := handler(10, nil, false, nil, pushFunc, parserFunc) - + h := handler(10, nil, false, nil, RetryConfig{}, pushFunc, parserFunc) recorder := httptest.NewRecorder() h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}})) @@ -847,6 +847,127 @@ func TestHandler_ErrorTranslation(t *testing.T) { } } +func TestHandler_HandleRetryAfterHeader(t *testing.T) { + testCases := []struct { + name string + responseCode int + retryAttempt string + retryCfg RetryConfig + expectRetry bool + minRetryAfter int + maxRetryAfter int + }{ + { + name: "Request canceled, HTTP 499, no Retry-After", + responseCode: http.StatusRequestTimeout, + retryAttempt: "1", + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 3, MaxBackoffExponent: 2}, + expectRetry: false, + }, + { + name: "Generic error, HTTP 500, no Retry-After", + responseCode: http.StatusInternalServerError, + retryCfg: RetryConfig{Enabled: false, BaseSeconds: 3, MaxBackoffExponent: 4}, + expectRetry: false, + }, + { + name: "Generic error, HTTP 500, Retry-After with no Retry-Attempt set, default Retry-Attempt to 1", + responseCode: http.StatusInternalServerError, + expectRetry: true, + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 5, MaxBackoffExponent: 2}, + minRetryAfter: 5, + maxRetryAfter: 10, + }, + { + name: "Generic error, HTTP 500, Retry-After with Retry-Attempt is not an integer, default Retry-Attempt to 1", + responseCode: http.StatusInternalServerError, + retryAttempt: "not-an-integer", + expectRetry: true, + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 3, MaxBackoffExponent: 2}, + minRetryAfter: 3, + maxRetryAfter: 6, + }, + { + name: "Generic error, HTTP 500, Retry-After with Retry-Attempt is float, default Retry-Attempt to 1", + responseCode: http.StatusInternalServerError, + retryAttempt: "3.50", + expectRetry: true, + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 2, MaxBackoffExponent: 5}, + minRetryAfter: 2, + maxRetryAfter: 4, + }, + { + name: "Generic error, HTTP 500, Retry-After with Retry-Attempt a list of integers, default Retry-Attempt to 1", + responseCode: http.StatusInternalServerError, + retryAttempt: "[1, 2, 3]", + expectRetry: true, + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 1, MaxBackoffExponent: 5}, + minRetryAfter: 1, + maxRetryAfter: 2, + }, + { + name: "Generic error, HTTP 500, Retry-After with Retry-Attempt is negative, default Retry-Attempt to 1", + responseCode: http.StatusInternalServerError, + retryAttempt: "-1", + expectRetry: true, + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 4, MaxBackoffExponent: 3}, + minRetryAfter: 4, + maxRetryAfter: 8, + }, + { + name: "Generic error, HTTP 500, Retry-After with valid Retry-Attempts set to 2", + responseCode: http.StatusInternalServerError, + expectRetry: true, + retryAttempt: "2", + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 2, MaxBackoffExponent: 5}, + minRetryAfter: 4, + maxRetryAfter: 8, + }, + { + name: "Generic error, HTTP 429, Retry-After with valid Retry-Attempts set to 3", + responseCode: StatusServiceOverloaded, + expectRetry: true, + retryAttempt: "3", + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 2, MaxBackoffExponent: 5}, + minRetryAfter: 8, + maxRetryAfter: 16, + }, + { + name: "Generic error, HTTP 500, Retry-After with Retry-Attempts set higher than MaxAllowedAttempts", + responseCode: http.StatusInternalServerError, + expectRetry: true, + retryAttempt: "8", + retryCfg: RetryConfig{Enabled: true, BaseSeconds: 3, MaxBackoffExponent: 2}, + minRetryAfter: 6, + maxRetryAfter: 12, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + recorder := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}}) + + if tc.retryAttempt != "" { + req.Header.Add("Retry-Attempt", tc.retryAttempt) + } + + addHeaders(recorder, nil, req, tc.responseCode, tc.retryCfg) + + retryAfter := recorder.Header().Get("Retry-After") + if !tc.expectRetry { + assert.Empty(t, retryAfter) + } else { + assert.NotEmpty(t, retryAfter) + retryAfterInt, err := strconv.Atoi(retryAfter) + assert.NoError(t, err) + assert.GreaterOrEqual(t, retryAfterInt, tc.minRetryAfter) + assert.LessOrEqual(t, retryAfterInt, tc.maxRetryAfter) + } + }) + } +} + func TestHandler_ToHTTPStatus(t *testing.T) { userID := "user" originalMsg := "this is an error" @@ -1026,3 +1147,54 @@ func TestHandler_ToHTTPStatus(t *testing.T) { }) } } + +func TestRetryConfig_Validate(t *testing.T) { + t.Parallel() + tests := map[string]struct { + cfg RetryConfig + expectedErr error + }{ + "should pass with default config": { + cfg: func() RetryConfig { + cfg := RetryConfig{} + flagext.DefaultValues(&cfg) + return cfg + }(), + expectedErr: nil, + }, + "should fail if retry base is less than 1 second": { + cfg: RetryConfig{ + BaseSeconds: 0, + MaxBackoffExponent: 5, + }, + expectedErr: errRetryBaseLessThanOneSecond, + }, + "should fail if retry base is negative": { + cfg: RetryConfig{ + BaseSeconds: -1, + MaxBackoffExponent: 5, + }, + expectedErr: errRetryBaseLessThanOneSecond, + }, + "should fail if max allowed attempts is 0": { + cfg: RetryConfig{ + BaseSeconds: 3, + MaxBackoffExponent: 0, + }, + expectedErr: errNonPositiveMaxBackoffExponent, + }, + "should fail if max allowed attempts is negative": { + cfg: RetryConfig{ + BaseSeconds: 3, + MaxBackoffExponent: -1, + }, + expectedErr: errNonPositiveMaxBackoffExponent, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expectedErr, testData.cfg.Validate()) + }) + } +}