Skip to content

Commit

Permalink
Distributor: Includes the Retry-After header in recoverable error res…
Browse files Browse the repository at this point in the history
…ponses (#6608)

* Distributor: Add Retry-After header for 429 response

* remove developmenet changes

* implement selected algorithm

* address comment

* Update pkg/distributor/push_test.go

Co-authored-by: Dimitar Dimitrov <[email protected]>

* address comments in PR

* add traces instead of log to save us some money

* Update CHANGELOG.md

Co-authored-by: Peter Štibraný <[email protected]>

* change the base to base second which is integer

* fix comments

---------

Co-authored-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Peter Štibraný <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2023
1 parent 34678fd commit f4872c6
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* `http.StatusServiceUnavailable` (503) and `codes.Unknown` are replaced with `codes.Internal`.
* [CHANGE] Upgrade Node.js to v20. #6540
* [CHANGE] Querier: `cortex_querier_blocks_consistency_checks_failed_total` is now incremented when a block couldn't be queried from any attempted store-gateway as opposed to incremented after each attempt. Also `cortex_querier_blocks_consistency_checks_total` is incremented once per query as opposed to once per attempt (with 3 attempts). #6590
* [FEATURE] Distributor: added option `-distributor.retry-after-header.enabled` to include the `Retry-After` header in recoverable error responses. #6608
* [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609
* [FEATURE] Vault: Added support for new Vault authentication methods: `AppRole`, `Kubernetes`, `UserPass` and `Token`. #6143
* [FEATURE] Add experimental endpoint `/api/v1/cardinality/active_series` to return the set of active series for a given selector. #6536 #6619 #6651
Expand Down
43 changes: 43 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,49 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "block",
"name": "retry_after_header",
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "enabled",
"required": false,
"desc": "Enabled controls inclusion of the Retry-After header in the response: true includes it for client retry guidance, false omits it.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.retry-after-header.enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "base_seconds",
"required": false,
"desc": "Base duration in seconds for calculating the Retry-After header in responses to 429/5xx errors.",
"fieldValue": null,
"fieldDefaultValue": 3,
"fieldFlag": "distributor.retry-after-header.base-seconds",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_backoff_exponent",
"required": false,
"desc": "Sets the upper limit on the number of Retry-Attempt considered for calculation. It caps the Retry-Attempt header without rejecting additional attempts, controlling exponential backoff calculations. For example, when the base-seconds is set to 3 and max-backoff-exponent to 5, the maximum retry duration would be 3 * 2^5 = 96 seconds.",
"fieldValue": null,
"fieldDefaultValue": 5,
"fieldFlag": "distributor.retry-after-header.max-backoff-exponent",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "block",
"name": "ha_tracker",
Expand Down
6 changes: 6 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,12 @@ Usage of ./cmd/mimir/mimir:
Per-tenant allowed push request burst size. 0 to disable.
-distributor.request-rate-limit float
Per-tenant push request rate limit in requests per second. 0 to disable.
-distributor.retry-after-header.base-seconds int
[experimental] Base duration in seconds for calculating the Retry-After header in responses to 429/5xx errors. (default 3)
-distributor.retry-after-header.enabled
[experimental] Enabled controls inclusion of the Retry-After header in the response: true includes it for client retry guidance, false omits it.
-distributor.retry-after-header.max-backoff-exponent int
[experimental] Sets the upper limit on the number of Retry-Attempt considered for calculation. It caps the Retry-Attempt header without rejecting additional attempts, controlling exponential backoff calculations. For example, when the base-seconds is set to 3 and max-backoff-exponent to 5, the maximum retry duration would be 3 * 2^5 = 96 seconds. (default 5)
-distributor.ring.consul.acl-token string
ACL Token used to interact with Consul.
-distributor.ring.consul.cas-retry-delay duration
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ The following features are currently experimental:
- `-distributor.enable-otlp-metadata-storage`
- Using status code 529 instead of 429 upon rate limit exhaustion.
- `distributor.service-overload-status-code-on-rate-limit-enabled`
- Set Retry-After header in recoverable error responses
- `-distributor.retry-after-header.enabled`
- `-distributor.retry-after-header.base-seconds`
- `-distributor.retry-after-header.max-backoff-exponent`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
19 changes: 19 additions & 0 deletions docs/sources/mimir/references/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,25 @@ pool:
# CLI flag: -distributor.health-check-ingesters
[health_check_ingesters: <boolean> | default = true]
retry_after_header:
# (experimental) Enabled controls inclusion of the Retry-After header in the
# response: true includes it for client retry guidance, false omits it.
# CLI flag: -distributor.retry-after-header.enabled
[enabled: <boolean> | default = false]
# (experimental) Base duration in seconds for calculating the Retry-After
# header in responses to 429/5xx errors.
# CLI flag: -distributor.retry-after-header.base-seconds
[base_seconds: <int> | default = 3]
# (experimental) Sets the upper limit on the number of Retry-Attempt
# considered for calculation. It caps the Retry-Attempt header without
# rejecting additional attempts, controlling exponential backoff calculations.
# For example, when the base-seconds is set to 3 and max-backoff-exponent to
# 5, the maximum retry duration would be 3 * 2^5 = 96 seconds.
# CLI flag: -distributor.retry-after-header.max-backoff-exponent
[max_backoff_exponent: <int> | default = 5]
ha_tracker:
# Enable the distributors HA tracker so that it can accept samples from
# Prometheus HA replicas gracefully (requires labels).
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ const OTLPPushEndpoint = "/otlp/v1/metrics"
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, reg, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, reg, d.PushWithMiddlewares), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
7 changes: 6 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type Distributor struct {
type Config struct {
PoolConfig PoolConfig `yaml:"pool"`

RetryConfig RetryConfig `yaml:"retry_after_header"`
HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"`
Expand Down Expand Up @@ -200,6 +201,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.PoolConfig.RegisterFlags(f)
cfg.HATrackerConfig.RegisterFlags(f)
cfg.DistributorRing.RegisterFlags(f, logger)
cfg.RetryConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
Expand All @@ -215,7 +217,10 @@ func (cfg *Config) Validate(limits validation.Limits) error {
return errInvalidTenantShardSize
}

return cfg.HATrackerConfig.Validate()
if err := cfg.HATrackerConfig.Validate(); err != nil {
return err
}
return cfg.RetryConfig.Validate()
}

const (
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ func OTLPHandler(
allowSkipLabelNameValidation bool,
enableOtelMetadataStorage bool,
limits *validation.Overrides,
retryCfg RetryConfig,
reg prometheus.Registerer,
push PushFunc,
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) {
var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error)

logger := log.WithContext(ctx, log.Logger)
Expand Down
75 changes: 69 additions & 6 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ package distributor
import (
"context"
"errors"
"flag"
"fmt"
"math/rand"
"net/http"
"strconv"
"sync"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand All @@ -36,24 +40,52 @@ type bufHolder struct {
buf []byte
}

var bufferPool = sync.Pool{
New: func() interface{} { return &bufHolder{buf: make([]byte, 256*1024)} },
}
var (
bufferPool = sync.Pool{
New: func() interface{} { return &bufHolder{buf: make([]byte, 256*1024)} },
}
errRetryBaseLessThanOneSecond = errors.New("retry base duration should not be less than 1 second")
errNonPositiveMaxBackoffExponent = errors.New("max backoff exponent should be a positive value")
)

const (
SkipLabelNameValidationHeader = "X-Mimir-SkipLabelNameValidation"
statusClientClosedRequest = 499
)

type RetryConfig struct {
Enabled bool `yaml:"enabled" category:"experimental"`
BaseSeconds int `yaml:"base_seconds" category:"experimental"`
MaxBackoffExponent int `yaml:"max_backoff_exponent" category:"experimental"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *RetryConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "distributor.retry-after-header.enabled", false, "Enabled controls inclusion of the Retry-After header in the response: true includes it for client retry guidance, false omits it.")
f.IntVar(&cfg.BaseSeconds, "distributor.retry-after-header.base-seconds", 3, "Base duration in seconds for calculating the Retry-After header in responses to 429/5xx errors.")
f.IntVar(&cfg.MaxBackoffExponent, "distributor.retry-after-header.max-backoff-exponent", 5, "Sets the upper limit on the number of Retry-Attempt considered for calculation. It caps the Retry-Attempt header without rejecting additional attempts, controlling exponential backoff calculations. For example, when the base-seconds is set to 3 and max-backoff-exponent to 5, the maximum retry duration would be 3 * 2^5 = 96 seconds.")
}

func (cfg *RetryConfig) Validate() error {
if cfg.BaseSeconds < 1 {
return errRetryBaseLessThanOneSecond
}
if cfg.MaxBackoffExponent < 1 {
return errNonPositiveMaxBackoffExponent
}
return nil
}

// Handler is a http.Handler which accepts WriteRequests.
func Handler(
maxRecvMsgSize int,
sourceIPs *middleware.SourceIPExtractor,
allowSkipLabelNameValidation bool,
limits *validation.Overrides,
retryCfg RetryConfig,
push PushFunc,
) http.Handler {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest) ([]byte, error) {
res, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, dst, req, util.RawSnappy)
if errors.Is(err, util.MsgSizeTooLargeErr{}) {
err = distributorMaxWriteMessageSizeErr{actual: int(r.ContentLength), limit: maxRecvMsgSize}
Expand All @@ -79,6 +111,7 @@ func handler(
sourceIPs *middleware.SourceIPExtractor,
allowSkipLabelNameValidation bool,
limits *validation.Overrides,
retryCfg RetryConfig,
push PushFunc,
parser parserFunc,
) http.Handler {
Expand Down Expand Up @@ -141,12 +174,30 @@ func handler(
if code != 202 {
level.Error(logger).Log("msg", "push error", "err", err)
}
addHeaders(w, err)
addHeaders(w, err, r, code, retryCfg)
http.Error(w, msg, code)
}
})
}

func calculateRetryAfter(retryAttemptHeader string, baseSeconds int, maxBackoffExponent int) string {
retryAttempt, err := strconv.Atoi(retryAttemptHeader)
// If retry-attempt is not valid, set it to default 1
if err != nil || retryAttempt < 1 {
retryAttempt = 1
}
if retryAttempt > maxBackoffExponent {
retryAttempt = maxBackoffExponent
}
var minSeconds, maxSeconds int64
minSeconds = int64(baseSeconds) << (retryAttempt - 1)
maxSeconds = int64(minSeconds) << 1

delaySeconds := minSeconds + rand.Int63n(maxSeconds-minSeconds)

return strconv.FormatInt(delaySeconds, 10)
}

// toHTTPStatus converts the given error into an appropriate HTTP status corresponding
// to that error, if the error is one of the errors from this package. Otherwise, an
// http.StatusInternalServerError is returned.
Expand Down Expand Up @@ -185,9 +236,21 @@ func toHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrid
return http.StatusInternalServerError
}

func addHeaders(w http.ResponseWriter, err error) {
func addHeaders(w http.ResponseWriter, err error, r *http.Request, responseCode int, retryCfg RetryConfig) {
var doNotLogError middleware.DoNotLogError
if errors.As(err, &doNotLogError) {
w.Header().Set(server.DoNotLogErrorHeaderKey, "true")
}

if responseCode == http.StatusTooManyRequests || responseCode/100 == 5 {
if retryCfg.Enabled {
retryAttemptHeader := r.Header.Get("Retry-Attempt")
retrySeconds := calculateRetryAfter(retryAttemptHeader, retryCfg.BaseSeconds, retryCfg.MaxBackoffExponent)
w.Header().Set("Retry-After", retrySeconds)
if sp := opentracing.SpanFromContext(r.Context()); sp != nil {
sp.SetTag("retry-after", retrySeconds)
sp.SetTag("retry-attempt", retryAttemptHeader)
}
}
}
}
Loading

0 comments on commit f4872c6

Please sign in to comment.