Skip to content

Commit

Permalink
Add retries for instant query
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Sep 13, 2023
1 parent b49cf5a commit 84e2b57
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 54 deletions.
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
return nil, err
}

instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer)
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, prometheus.DefaultRegisterer, t.Cfg.QueryRange.MaxRetries, queryAnalyzer)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ package instantquery

import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/querysharding"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
)

func Middlewares(
log log.Logger,
limits tripperware.Limits,
registerer prometheus.Registerer,
maxRetries int,
queryAnalyzer querysharding.Analyzer,
) ([]tripperware.Middleware, error) {
var m []tripperware.Middleware

m = append(m, queryrange.NewRetryMiddleware(log, maxRetries, queryrange.NewRetryMiddlewareMetrics(registerer)))
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer))
return m, nil
}
118 changes: 118 additions & 0 deletions pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package instantquery

import (
"context"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

var (
query = "/api/v1/query?time=1536716898&query=sum by (label) (up)&stats=all"
responseBody = `{"status":"success","data":{"resultType":"vector","result":[]}}`
)

func TestRoundTrip(t *testing.T) {
t.Parallel()
var try atomic.Int32
s := httptest.NewServer(
middleware.AuthenticateUser.Wrap(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
if try.Inc() > 2 {
_, err = w.Write([]byte(responseBody))
} else {
http.Error(w, `{"status":"error"}`, http.StatusInternalServerError)
}
if err != nil {
t.Fatal(err)
}
}),
),
)
defer s.Close()

u, err := url.Parse(s.URL)
require.NoError(t, err)

downstream := singleHostRoundTripper{
host: u.Host,
next: http.DefaultTransport,
}
limits := tripperware.MockLimits{
ShardSize: 2,
}
qa := querysharding.NewQueryAnalyzer()
instantQueryMiddlewares, err := Middlewares(
log.NewNopLogger(),
limits,
nil,
3,
qa)
require.NoError(t, err)

tw := tripperware.NewQueryTripperware(
log.NewNopLogger(),
nil,
nil,
nil,
instantQueryMiddlewares,
nil,
InstantQueryCodec,
limits,
qa,
time.Minute,
)

for i, tc := range []struct {
path, expectedBody string
}{
{query, responseBody},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
//parallel testing causes data race
req, err := http.NewRequest("GET", tc.path, http.NoBody)
require.NoError(t, err)

// query-frontend doesn't actually authenticate requests, we rely on
// the queriers to do this. Hence we ensure the request doesn't have a
// org ID in the ctx, but does have the header.
ctx := user.InjectOrgID(context.Background(), "1")
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

resp, err := tw(downstream).RoundTrip(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)

bs, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, tc.expectedBody, string(bs))
})
}
}

type singleHostRoundTripper struct {
host string
next http.RoundTripper
}

func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
r.URL.Scheme = "http"
r.URL.Host = s.host
return s.next.RoundTrip(r)
}
30 changes: 2 additions & 28 deletions pkg/querier/tripperware/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) {
End: util.TimeToMillis(testData.reqEndTime),
}

limits := mockLimits{maxQueryLookback: testData.maxQueryLookback}
limits := tripperware.MockLimits{QueryLookback: testData.maxQueryLookback}
middleware := NewLimitsMiddleware(limits)

innerRes := NewEmptyPrometheusResponse()
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
End: util.TimeToMillis(testData.reqEndTime),
}

limits := mockLimits{maxQueryLength: testData.maxQueryLength}
limits := tripperware.MockLimits{QueryLength: testData.maxQueryLength}
middleware := NewLimitsMiddleware(limits)

innerRes := NewEmptyPrometheusResponse()
Expand Down Expand Up @@ -193,32 +193,6 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
}
}

type mockLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
}

func (m mockLimits) MaxQueryLookback(string) time.Duration {
return m.maxQueryLookback
}

func (m mockLimits) MaxQueryLength(string) time.Duration {
return m.maxQueryLength
}

func (mockLimits) MaxQueryParallelism(string) int {
return 14 // Flag default.
}

func (m mockLimits) MaxCacheFreshness(string) time.Duration {
return m.maxCacheFreshness
}

func (m mockLimits) QueryVerticalShardSize(userID string) int {
return 0
}

type mockHandler struct {
mock.Mock
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestRoundTrip(t *testing.T) {
qa := querysharding.NewQueryAnalyzer()
queyrangemiddlewares, _, err := Middlewares(Config{},
log.NewNopLogger(),
mockLimits{},
tripperware.MockLimits{},
nil,
nil,
nil,
Expand Down
16 changes: 8 additions & 8 deletions pkg/querier/tripperware/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestStatsCacheQuerySamples(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{},
tripperware.MockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -974,7 +974,7 @@ func TestHandleHit(t *testing.T) {
sut := resultsCache{
extractor: PrometheusResponseExtractor{},
minCacheExtent: 10,
limits: mockLimits{},
limits: tripperware.MockLimits{},
merger: PrometheusCodec,
next: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) {
return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil
Expand Down Expand Up @@ -1004,7 +1004,7 @@ func TestResultsCache(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{},
tripperware.MockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func TestResultsCacheRecent(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -1087,13 +1087,13 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
expectedResponse *PrometheusResponse
}{
{
fakeLimits: mockLimits{maxCacheFreshness: 5 * time.Second},
fakeLimits: tripperware.MockLimits{CacheFreshness: 5 * time.Second},
Handler: nil,
expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10),
},
{
// should not lookup cache because per-tenant override will be applied
fakeLimits: mockLimits{maxCacheFreshness: 10 * time.Minute},
fakeLimits: tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
Handler: tripperware.HandlerFunc(func(_ context.Context, _ tripperware.Request) (tripperware.Response, error) {
return parsedResponse, nil
}),
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{},
tripperware.MockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
log.NewNopLogger(),
cfg,
constSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusResponseExtractor{},
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestSplitByDay(t *testing.T) {
roundtripper := tripperware.NewRoundTripper(singleHostRoundTripper{
host: u.Host,
next: http.DefaultTransport,
}, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil))
}, PrometheusCodec, nil, NewLimitsMiddleware(tripperware.MockLimits{}), SplitByIntervalMiddleware(interval, tripperware.MockLimits{}, PrometheusCodec, nil))

req, err := http.NewRequest("GET", tc.path, http.NoBody)
require.NoError(t, err)
Expand Down
30 changes: 15 additions & 15 deletions pkg/querier/tripperware/test_shard_by_query_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ http_requests_total`,
}

qa := thanosquerysharding.NewQueryAnalyzer()
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec, qa))
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), MockLimits{ShardSize: tt.shardSize}, tt.codec, qa))

ctx := user.InjectOrgID(context.Background(), "1")

Expand All @@ -461,31 +461,31 @@ http_requests_total`,
}
}

type mockLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
shardSize int
type MockLimits struct {
QueryLookback time.Duration
QueryLength time.Duration
CacheFreshness time.Duration
ShardSize int
}

func (m mockLimits) MaxQueryLookback(string) time.Duration {
return m.maxQueryLookback
func (m MockLimits) MaxQueryLookback(string) time.Duration {
return m.QueryLookback
}

func (m mockLimits) MaxQueryLength(string) time.Duration {
return m.maxQueryLength
func (m MockLimits) MaxQueryLength(string) time.Duration {
return m.QueryLength
}

func (mockLimits) MaxQueryParallelism(string) int {
func (MockLimits) MaxQueryParallelism(string) int {
return 14 // Flag default.
}

func (m mockLimits) MaxCacheFreshness(string) time.Duration {
return m.maxCacheFreshness
func (m MockLimits) MaxCacheFreshness(string) time.Duration {
return m.CacheFreshness
}

func (m mockLimits) QueryVerticalShardSize(userID string) int {
return m.shardSize
func (m MockLimits) QueryVerticalShardSize(userID string) int {
return m.ShardSize
}

type singleHostRoundTripper struct {
Expand Down

0 comments on commit 84e2b57

Please sign in to comment.