diff --git a/CHANGELOG.md b/CHANGELOG.md index 2168368806..ac9f3fa8e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ * [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517 * [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542 * [ENHANCEMENT] Querier: Retry store gateway client connection closing gRPC error. #5558 +* [ENHANCEMENT] QueryFrontend: Add generic retry for all APIs. #5561. * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 02817b6468..64cce59870 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -498,7 +498,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro } func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { - roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer) + retry := transport.NewRetry(t.Cfg.QueryRange.MaxRetries, prometheus.DefaultRegisterer) + roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer, retry) if err != nil { return nil, err } diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index feba795492..8ef8fa3603 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -37,7 +37,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) { // Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered // into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil // (if there are no errors), and it uses the returned frontend (if any). -func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) { +func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) { switch { case cfg.DownstreamURL != "": // If the user has specified a downstream Prometheus, then we should use that. @@ -59,12 +59,12 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i cfg.FrontendV2.Port = grpcListenPort } - fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg) + fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry) return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err default: // No scheduler = use original frontend. - fr, err := v1.New(cfg.FrontendV1, limits, log, reg) + fr, err := v1.New(cfg.FrontendV1, limits, log, reg, retry) if err != nil { return nil, nil, nil, err } diff --git a/pkg/frontend/frontend_test.go b/pkg/frontend/frontend_test.go index 2ca1d62890..77694689bd 100644 --- a/pkg/frontend/frontend_test.go +++ b/pkg/frontend/frontend_test.go @@ -254,7 +254,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand httpListen, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) - rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil) + rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil, transport.NewRetry(0, nil)) require.NoError(t, err) require.NotNil(t, rt) // v1 will be nil if DownstreamURL is defined. diff --git a/pkg/frontend/transport/retry.go b/pkg/frontend/transport/retry.go new file mode 100644 index 0000000000..bf010745ac --- /dev/null +++ b/pkg/frontend/transport/retry.go @@ -0,0 +1,56 @@ +package transport + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" +) + +type Retry struct { + maxRetries int + retriesCount prometheus.Histogram +} + +func NewRetry(maxRetries int, reg prometheus.Registerer) *Retry { + return &Retry{ + maxRetries: maxRetries, + retriesCount: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "query_frontend_retries", + Help: "Number of times a request is retried.", + Buckets: []float64{0, 1, 2, 3, 4, 5}, + }), + } +} + +func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)) (*httpgrpc.HTTPResponse, error) { + if r.maxRetries == 0 { + // Retries are disabled. Try only once. + return f() + } + + tries := 0 + defer func() { r.retriesCount.Observe(float64(tries)) }() + + var ( + resp *httpgrpc.HTTPResponse + err error + ) + for ; tries < r.maxRetries; tries++ { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + resp, err = f() + if err != nil && err != context.Canceled { + continue // Retryable + } else if resp != nil && resp.Code/100 == 5 { + continue // Retryable + } else { + break + } + } + return resp, err +} diff --git a/pkg/frontend/transport/retry_test.go b/pkg/frontend/transport/retry_test.go new file mode 100644 index 0000000000..a79c083640 --- /dev/null +++ b/pkg/frontend/transport/retry_test.go @@ -0,0 +1,31 @@ +package transport + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "go.uber.org/atomic" +) + +func TestRetry(t *testing.T) { + tries := atomic.NewInt64(3) + r := NewRetry(3, nil) + ctx := context.Background() + res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { + try := tries.Dec() + if try > 1 { + return &httpgrpc.HTTPResponse{ + Code: 500, + }, nil + } + return &httpgrpc.HTTPResponse{ + Code: 200, + }, nil + + }) + + require.NoError(t, err) + require.Equal(t, int32(200), res.Code) +} diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 3f8aa8d19c..ac5074dd1c 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/scheduler/queue" @@ -66,6 +67,7 @@ type Frontend struct { cfg Config log log.Logger limits Limits + retry *transport.Retry requestQueue *queue.RequestQueue activeUsers *util.ActiveUsersCleanupService @@ -92,11 +94,12 @@ type request struct { } // New creates a new frontend. Frontend implements service, and must be started and stopped. -func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) { +func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { f := &Frontend{ cfg: cfg, log: log, limits: limits, + retry: retry, queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_query_frontend_queue_length", Help: "Number of queries in the queue.", @@ -173,31 +176,33 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) } } - request := request{ - request: req, - originalCtx: ctx, + return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { + request := request{ + request: req, + originalCtx: ctx, - // Buffer of 1 to ensure response can be written by the server side - // of the Process stream, even if this goroutine goes away due to - // client context cancellation. - err: make(chan error, 1), - response: make(chan *httpgrpc.HTTPResponse, 1), - } + // Buffer of 1 to ensure response can be written by the server side + // of the Process stream, even if this goroutine goes away due to + // client context cancellation. + err: make(chan error, 1), + response: make(chan *httpgrpc.HTTPResponse, 1), + } - if err := f.queueRequest(ctx, &request); err != nil { - return nil, err - } + if err := f.queueRequest(ctx, &request); err != nil { + return nil, err + } - select { - case <-ctx.Done(): - return nil, ctx.Err() + select { + case <-ctx.Done(): + return nil, ctx.Err() - case resp := <-request.response: - return resp, nil + case resp := <-request.response: + return resp, nil - case err := <-request.err: - return nil, err - } + case err := <-request.err: + return nil, err + } + }) } // Process allows backends to pull requests from the frontend. diff --git a/pkg/frontend/v1/frontend_test.go b/pkg/frontend/v1/frontend_test.go index cb0457b0bd..7951dd1d60 100644 --- a/pkg/frontend/v1/frontend_test.go +++ b/pkg/frontend/v1/frontend_test.go @@ -247,7 +247,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a require.NoError(t, err) limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}} - v1, err := New(config, limits, logger, reg) + v1, err := New(config, limits, logger, reg, transport.NewRetry(0, nil)) require.NoError(t, err) require.NotNil(t, v1) require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1)) diff --git a/pkg/frontend/v1/queue_test.go b/pkg/frontend/v1/queue_test.go index e58f73176d..3549eaf98b 100644 --- a/pkg/frontend/v1/queue_test.go +++ b/pkg/frontend/v1/queue_test.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -25,7 +26,7 @@ func setupFrontend(t *testing.T, config Config) (*Frontend, error) { logger := log.NewNopLogger() limits := MockLimits{Queriers: 3, MockLimits: queue.MockLimits{MaxOutstanding: 100}} - frontend, err := New(config, limits, logger, nil) + frontend, err := New(config, limits, logger, nil, transport.NewRetry(0, nil)) require.NoError(t, err) t.Cleanup(func() { diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 3781eb9f33..dea15faeaf 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -18,6 +18,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "go.uber.org/atomic" + "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" @@ -66,6 +67,8 @@ type Frontend struct { cfg Config log log.Logger + retry *transport.Retry + lastQueryID atomic.Uint64 // frontend workers will read from this channel, and send request to scheduler. @@ -109,7 +112,7 @@ type enqueueResult struct { } // NewFrontend creates a new frontend. -func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer) (*Frontend, error) { +func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { requestsCh := make(chan *frontendRequest) schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log) @@ -122,6 +125,7 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer) (*Fronte log: log, requestsCh: requestsCh, schedulerWorkers: schedulerWorkers, + retry: retry, requests: newRequestsInProgress(), } // Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results @@ -184,78 +188,80 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) ctx, cancel := context.WithCancel(ctx) defer cancel() - freq := &frontendRequest{ - queryID: f.lastQueryID.Inc(), - request: req, - userID: userID, - statsEnabled: stats.IsEnabled(ctx), - - cancel: cancel, + return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { + freq := &frontendRequest{ + queryID: f.lastQueryID.Inc(), + request: req, + userID: userID, + statsEnabled: stats.IsEnabled(ctx), - // Buffer of 1 to ensure response or error can be written to the channel - // even if this goroutine goes away due to client context cancellation. - enqueue: make(chan enqueueResult, 1), - response: make(chan *frontendv2pb.QueryResultRequest, 1), + cancel: cancel, - retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, - } + // Buffer of 1 to ensure response or error can be written to the channel + // even if this goroutine goes away due to client context cancellation. + enqueue: make(chan enqueueResult, 1), + response: make(chan *frontendv2pb.QueryResultRequest, 1), - f.requests.put(freq) - defer f.requests.delete(freq.queryID) + retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, + } - retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. + f.requests.put(freq) + defer f.requests.delete(freq.queryID) -enqueueAgain: - select { - case <-ctx.Done(): - return nil, ctx.Err() + retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. - case f.requestsCh <- freq: - // Enqueued, let's wait for response. - } + enqueueAgain: + select { + case <-ctx.Done(): + return nil, ctx.Err() - var cancelCh chan<- uint64 + case f.requestsCh <- freq: + // Enqueued, let's wait for response. + } - select { - case <-ctx.Done(): - return nil, ctx.Err() + var cancelCh chan<- uint64 - case enqRes := <-freq.enqueue: - if enqRes.status == waitForResponse { - cancelCh = enqRes.cancelCh - break // go wait for response. - } else if enqRes.status == failed { - retries-- - if retries > 0 { - goto enqueueAgain + select { + case <-ctx.Done(): + return nil, ctx.Err() + + case enqRes := <-freq.enqueue: + if enqRes.status == waitForResponse { + cancelCh = enqRes.cancelCh + break // go wait for response. + } else if enqRes.status == failed { + retries-- + if retries > 0 { + goto enqueueAgain + } } + + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") } - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") - } + select { + case <-ctx.Done(): + if cancelCh != nil { + select { + case cancelCh <- freq.queryID: + // cancellation sent. + default: + // failed to cancel, log it. + level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID) + f.cancelFailedQueries.Inc() + } + } + return nil, ctx.Err() - select { - case <-ctx.Done(): - if cancelCh != nil { - select { - case cancelCh <- freq.queryID: - // cancellation sent. - default: - // failed to cancel, log it. - level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID) - f.cancelFailedQueries.Inc() + case resp := <-freq.response: + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { + stats := stats.FromContext(ctx) + stats.Merge(resp.Stats) // Safe if stats is nil. } - } - return nil, ctx.Err() - case resp := <-freq.response: - if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { - stats := stats.FromContext(ctx) - stats.Merge(resp.Stats) // Safe if stats is nil. + return resp.HttpResponse, nil } - - return resp.HttpResponse, nil - } + }) } func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) { diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index b6b88d2a37..59729e1757 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -16,6 +16,7 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" @@ -26,7 +27,7 @@ import ( const testFrontendWorkerConcurrency = 5 -func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) { +func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend, maxRetries int) (*Frontend, *mockScheduler) { l, err := net.Listen("tcp", "") require.NoError(t, err) @@ -47,7 +48,7 @@ func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *sched //logger := log.NewLogfmtLogger(os.Stdout) logger := log.NewNopLogger() - f, err := NewFrontend(cfg, logger, nil) + f, err := NewFrontend(cfg, logger, nil, transport.NewRetry(maxRetries, nil)) require.NoError(t, err) frontendv2pb.RegisterFrontendForQuerierServer(server, f) @@ -107,7 +108,7 @@ func TestFrontendBasicWorkflow(t *testing.T) { }) return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} - }) + }, 0) resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) require.NoError(t, err) @@ -115,6 +116,35 @@ func TestFrontendBasicWorkflow(t *testing.T) { require.Equal(t, []byte(body), resp.Body) } +func TestFrontendRetryRequest(t *testing.T) { + tries := atomic.NewInt64(3) + const ( + body = "hello world" + userID = "test" + ) + + f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend { + try := tries.Dec() + if try > 0 { + go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{ + Code: 500, + Body: []byte(body), + }) + } else { + go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte(body), + }) + } + + return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} + }, 3) + + res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) + require.NoError(t, err) + require.Equal(t, int32(200), res.Code) +} + func TestFrontendRetryEnqueue(t *testing.T) { // Frontend uses worker concurrency to compute number of retries. We use one less failure. failures := atomic.NewInt64(testFrontendWorkerConcurrency - 1) @@ -135,7 +165,7 @@ func TestFrontendRetryEnqueue(t *testing.T) { }) return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} - }) + }, 0) _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) require.NoError(t, err) @@ -144,7 +174,7 @@ func TestFrontendRetryEnqueue(t *testing.T) { func TestFrontendEnqueueFailure(t *testing.T) { f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN} - }) + }, 0) _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{}) require.Error(t, err) @@ -152,7 +182,7 @@ func TestFrontendEnqueueFailure(t *testing.T) { } func TestFrontendCancellation(t *testing.T) { - f, ms := setupFrontend(t, nil) + f, ms := setupFrontend(t, nil, 0) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() @@ -178,7 +208,7 @@ func TestFrontendCancellation(t *testing.T) { } func TestFrontendFailedCancellation(t *testing.T) { - f, ms := setupFrontend(t, nil) + f, ms := setupFrontend(t, nil, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 3cc14cbb54..09a768028a 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -109,10 +109,6 @@ func Middlewares( queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("results_cache", metrics), queryCacheMiddleware) } - if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) - } - queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer)) return queryRangeMiddleware, c, nil diff --git a/pkg/querier/tripperware/queryrange/retry.go b/pkg/querier/tripperware/queryrange/retry.go deleted file mode 100644 index d6d76f83ea..0000000000 --- a/pkg/querier/tripperware/queryrange/retry.go +++ /dev/null @@ -1,86 +0,0 @@ -package queryrange - -import ( - "context" - "errors" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/weaveworks/common/httpgrpc" - - "github.com/cortexproject/cortex/pkg/querier/tripperware" - util_log "github.com/cortexproject/cortex/pkg/util/log" -) - -type RetryMiddlewareMetrics struct { - retriesCount prometheus.Histogram -} - -func NewRetryMiddlewareMetrics(registerer prometheus.Registerer) *RetryMiddlewareMetrics { - return &RetryMiddlewareMetrics{ - retriesCount: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "query_frontend_retries", - Help: "Number of times a request is retried.", - Buckets: []float64{0, 1, 2, 3, 4, 5}, - }), - } -} - -type retry struct { - log log.Logger - next tripperware.Handler - maxRetries int - - metrics *RetryMiddlewareMetrics -} - -// NewRetryMiddleware returns a middleware that retries requests if they -// fail with 500 or a non-HTTP error. -func NewRetryMiddleware(log log.Logger, maxRetries int, metrics *RetryMiddlewareMetrics) tripperware.Middleware { - if metrics == nil { - metrics = NewRetryMiddlewareMetrics(nil) - } - - return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler { - return retry{ - log: log, - next: next, - maxRetries: maxRetries, - metrics: metrics, - } - }) -} - -func (r retry) Do(ctx context.Context, req tripperware.Request) (tripperware.Response, error) { - tries := 0 - defer func() { r.metrics.retriesCount.Observe(float64(tries)) }() - - var lastErr error - for ; tries < r.maxRetries; tries++ { - if ctx.Err() != nil { - return nil, ctx.Err() - } - resp, err := r.next.Do(ctx, req) - if err == nil { - return resp, nil - } - - if errors.Is(err, context.Canceled) { - return nil, err - } - - // Retry if we get a HTTP 500 or a non-HTTP error. - httpResp, ok := httpgrpc.HTTPResponseFromError(err) - if !ok || httpResp.Code/100 == 5 { - lastErr = err - level.Error(util_log.WithContext(ctx, r.log)).Log("msg", "error processing request", "try", tries, "err", err) - continue - } - - return nil, err - } - return nil, lastErr -} diff --git a/pkg/querier/tripperware/queryrange/retry_test.go b/pkg/querier/tripperware/queryrange/retry_test.go deleted file mode 100644 index c1cd7e1af0..0000000000 --- a/pkg/querier/tripperware/queryrange/retry_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package queryrange - -import ( - "context" - "errors" - "fmt" - "net/http" - "testing" - - "github.com/go-kit/log" - "github.com/stretchr/testify/require" - "github.com/weaveworks/common/httpgrpc" - "go.uber.org/atomic" - - "github.com/cortexproject/cortex/pkg/querier/tripperware" -) - -func TestRetry(t *testing.T) { - t.Parallel() - var try atomic.Int32 - - for _, tc := range []struct { - name string - handler tripperware.Handler - resp tripperware.Response - err error - }{ - { - name: "retry failures", - handler: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { - if try.Inc() == 5 { - return &PrometheusResponse{Status: "Hello World"}, nil - } - return nil, fmt.Errorf("fail") - }), - resp: &PrometheusResponse{Status: "Hello World"}, - }, - { - name: "don't retry 400s", - handler: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { - return nil, httpgrpc.Errorf(http.StatusBadRequest, "Bad Request") - }), - err: httpgrpc.Errorf(http.StatusBadRequest, "Bad Request"), - }, - { - name: "retry 500s", - handler: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Internal Server Error") - }), - err: httpgrpc.Errorf(http.StatusInternalServerError, "Internal Server Error"), - }, - { - name: "last error", - handler: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { - if try.Inc() == 5 { - return nil, httpgrpc.Errorf(http.StatusBadRequest, "Bad Request") - } - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Internal Server Error") - }), - err: httpgrpc.Errorf(http.StatusBadRequest, "Bad Request"), - }, - } { - t.Run(tc.name, func(t *testing.T) { - //parallel testing causes data race - try.Store(0) - h := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(tc.handler) - resp, err := h.Do(context.Background(), nil) - require.Equal(t, tc.err, err) - require.Equal(t, tc.resp, resp) - }) - } -} - -func Test_RetryMiddlewareCancel(t *testing.T) { - t.Parallel() - var try atomic.Int32 - ctx, cancel := context.WithCancel(context.Background()) - cancel() - _, err := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap( - tripperware.HandlerFunc(func(c context.Context, r tripperware.Request) (tripperware.Response, error) { - try.Inc() - return nil, ctx.Err() - }), - ).Do(ctx, nil) - require.Equal(t, int32(0), try.Load()) - require.Equal(t, ctx.Err(), err) - - ctx, cancel = context.WithCancel(context.Background()) - _, err = NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap( - tripperware.HandlerFunc(func(c context.Context, r tripperware.Request) (tripperware.Response, error) { - try.Inc() - cancel() - return nil, errors.New("failed") - }), - ).Do(ctx, nil) - require.Equal(t, int32(1), try.Load()) - require.Equal(t, ctx.Err(), err) -}