diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 02817b6468b..12edbd21a04 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -471,7 +471,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, prometheus.DefaultRegisterer, t.Cfg.QueryRange.MaxRetries, queryAnalyzer) if err != nil { return nil, err } diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index b88515e6be0..3b3c9014d7b 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -2,18 +2,23 @@ package instantquery import ( "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/querysharding" "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" ) func Middlewares( log log.Logger, limits tripperware.Limits, + registerer prometheus.Registerer, + maxRetries int, queryAnalyzer querysharding.Analyzer, ) ([]tripperware.Middleware, error) { var m []tripperware.Middleware + m = append(m, queryrange.NewRetryMiddleware(log, maxRetries, queryrange.NewRetryMiddlewareMetrics(registerer))) m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer)) return m, nil } diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go new file mode 100644 index 00000000000..54163e8b26a --- /dev/null +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go @@ -0,0 +1,118 @@ +package instantquery + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/querysharding" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/querier/tripperware" +) + +var ( + query = "/api/v1/query?time=1536716898&query=sum by (label) (up)&stats=all" + responseBody = `{"status":"success","data":{"resultType":"vector","result":[]}}` +) + +func TestRoundTrip(t *testing.T) { + t.Parallel() + var try atomic.Int32 + s := httptest.NewServer( + middleware.AuthenticateUser.Wrap( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + if try.Inc() > 2 { + _, err = w.Write([]byte(responseBody)) + } else { + http.Error(w, `{"status":"error"}`, http.StatusInternalServerError) + } + if err != nil { + t.Fatal(err) + } + }), + ), + ) + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + downstream := singleHostRoundTripper{ + host: u.Host, + next: http.DefaultTransport, + } + limits := tripperware.MockLimits{ + ShardSize: 2, + } + qa := querysharding.NewQueryAnalyzer() + instantQueryMiddlewares, err := Middlewares( + log.NewNopLogger(), + limits, + nil, + 3, + qa) + require.NoError(t, err) + + tw := tripperware.NewQueryTripperware( + log.NewNopLogger(), + nil, + nil, + nil, + instantQueryMiddlewares, + nil, + InstantQueryCodec, + limits, + qa, + time.Minute, + ) + + for i, tc := range []struct { + path, expectedBody string + }{ + {query, responseBody}, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + //parallel testing causes data race + req, err := http.NewRequest("GET", tc.path, http.NoBody) + require.NoError(t, err) + + // query-frontend doesn't actually authenticate requests, we rely on + // the queriers to do this. Hence we ensure the request doesn't have a + // org ID in the ctx, but does have the header. + ctx := user.InjectOrgID(context.Background(), "1") + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + resp, err := tw(downstream).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + bs, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedBody, string(bs)) + }) + } +} + +type singleHostRoundTripper struct { + host string + next http.RoundTripper +} + +func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + r.URL.Scheme = "http" + r.URL.Host = s.host + return s.next.RoundTrip(r) +} diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index 1569ea2e3af..1ab044010ea 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -75,7 +75,7 @@ func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) { End: util.TimeToMillis(testData.reqEndTime), } - limits := mockLimits{maxQueryLookback: testData.maxQueryLookback} + limits := tripperware.MockLimits{QueryLookback: testData.maxQueryLookback} middleware := NewLimitsMiddleware(limits) innerRes := NewEmptyPrometheusResponse() @@ -163,7 +163,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { End: util.TimeToMillis(testData.reqEndTime), } - limits := mockLimits{maxQueryLength: testData.maxQueryLength} + limits := tripperware.MockLimits{QueryLength: testData.maxQueryLength} middleware := NewLimitsMiddleware(limits) innerRes := NewEmptyPrometheusResponse() @@ -193,32 +193,6 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { } } -type mockLimits struct { - maxQueryLookback time.Duration - maxQueryLength time.Duration - maxCacheFreshness time.Duration -} - -func (m mockLimits) MaxQueryLookback(string) time.Duration { - return m.maxQueryLookback -} - -func (m mockLimits) MaxQueryLength(string) time.Duration { - return m.maxQueryLength -} - -func (mockLimits) MaxQueryParallelism(string) int { - return 14 // Flag default. -} - -func (m mockLimits) MaxCacheFreshness(string) time.Duration { - return m.maxCacheFreshness -} - -func (m mockLimits) QueryVerticalShardSize(userID string) int { - return 0 -} - type mockHandler struct { mock.Mock } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 5ade2abd522..286cafdece6 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -54,7 +54,7 @@ func TestRoundTrip(t *testing.T) { qa := querysharding.NewQueryAnalyzer() queyrangemiddlewares, _, err := Middlewares(Config{}, log.NewNopLogger(), - mockLimits{}, + tripperware.MockLimits{}, nil, nil, nil, diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index b7ba0d5f977..d0d5d621c77 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -209,7 +209,7 @@ func TestStatsCacheQuerySamples(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{}, + tripperware.MockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -974,7 +974,7 @@ func TestHandleHit(t *testing.T) { sut := resultsCache{ extractor: PrometheusResponseExtractor{}, minCacheExtent: 10, - limits: mockLimits{}, + limits: tripperware.MockLimits{}, merger: PrometheusCodec, next: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil @@ -1004,7 +1004,7 @@ func TestResultsCache(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{}, + tripperware.MockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -1046,7 +1046,7 @@ func TestResultsCacheRecent(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{maxCacheFreshness: 10 * time.Minute}, + tripperware.MockLimits{CacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -1087,13 +1087,13 @@ func TestResultsCacheMaxFreshness(t *testing.T) { expectedResponse *PrometheusResponse }{ { - fakeLimits: mockLimits{maxCacheFreshness: 5 * time.Second}, + fakeLimits: tripperware.MockLimits{CacheFreshness: 5 * time.Second}, Handler: nil, expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10), }, { // should not lookup cache because per-tenant override will be applied - fakeLimits: mockLimits{maxCacheFreshness: 10 * time.Minute}, + fakeLimits: tripperware.MockLimits{CacheFreshness: 10 * time.Minute}, Handler: tripperware.HandlerFunc(func(_ context.Context, _ tripperware.Request) (tripperware.Response, error) { return parsedResponse, nil }), @@ -1150,7 +1150,7 @@ func Test_resultsCache_MissingData(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{}, + tripperware.MockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -1263,7 +1263,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{maxCacheFreshness: 10 * time.Minute}, + tripperware.MockLimits{CacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, nil, diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 564263216a4..bcbe0bb91e9 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -309,7 +309,7 @@ func TestSplitByDay(t *testing.T) { roundtripper := tripperware.NewRoundTripper(singleHostRoundTripper{ host: u.Host, next: http.DefaultTransport, - }, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil)) + }, PrometheusCodec, nil, NewLimitsMiddleware(tripperware.MockLimits{}), SplitByIntervalMiddleware(interval, tripperware.MockLimits{}, PrometheusCodec, nil)) req, err := http.NewRequest("GET", tc.path, http.NoBody) require.NoError(t, err) diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index 657d7daa3a1..cac16d6b993 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -441,7 +441,7 @@ http_requests_total`, } qa := thanosquerysharding.NewQueryAnalyzer() - roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec, qa)) + roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), MockLimits{ShardSize: tt.shardSize}, tt.codec, qa)) ctx := user.InjectOrgID(context.Background(), "1") @@ -461,31 +461,31 @@ http_requests_total`, } } -type mockLimits struct { - maxQueryLookback time.Duration - maxQueryLength time.Duration - maxCacheFreshness time.Duration - shardSize int +type MockLimits struct { + QueryLookback time.Duration + QueryLength time.Duration + CacheFreshness time.Duration + ShardSize int } -func (m mockLimits) MaxQueryLookback(string) time.Duration { - return m.maxQueryLookback +func (m MockLimits) MaxQueryLookback(string) time.Duration { + return m.QueryLookback } -func (m mockLimits) MaxQueryLength(string) time.Duration { - return m.maxQueryLength +func (m MockLimits) MaxQueryLength(string) time.Duration { + return m.QueryLength } -func (mockLimits) MaxQueryParallelism(string) int { +func (MockLimits) MaxQueryParallelism(string) int { return 14 // Flag default. } -func (m mockLimits) MaxCacheFreshness(string) time.Duration { - return m.maxCacheFreshness +func (m MockLimits) MaxCacheFreshness(string) time.Duration { + return m.CacheFreshness } -func (m mockLimits) QueryVerticalShardSize(userID string) int { - return m.shardSize +func (m MockLimits) QueryVerticalShardSize(userID string) int { + return m.ShardSize } type singleHostRoundTripper struct {