diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 9088d2059a..00f0b10a20 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -474,6 +474,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) + instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, @@ -490,7 +491,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, t.Cfg.Querier.LookbackDelta) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta) if err != nil { return nil, err } @@ -501,7 +502,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryRangeMiddlewares, instantQueryMiddlewares, prometheusCodec, - instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec), + instantQueryCodec, t.Overrides, queryAnalyzer, t.Cfg.Querier.DefaultEvaluationInterval, diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index d83ae20ae9..034090a8b9 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -23,8 +23,6 @@ import ( ) var ( - InstantQueryCodec tripperware.Codec = NewInstantQueryCodec("", "protobuf") - json = jsoniter.Config{ EscapeHTML: false, // No HTML in our responses. SortMapKeys: true, diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index 5e4698c81a..0fd4876048 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -12,12 +12,13 @@ import ( func Middlewares( log log.Logger, limits tripperware.Limits, + merger tripperware.Merger, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration, ) ([]tripperware.Middleware, error) { m := []tripperware.Middleware{ NewLimitsMiddleware(limits, lookbackDelta), - tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer), + tripperware.ShardByMiddleware(log, limits, merger, queryAnalyzer), } return m, nil } diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index c9cb6d44af..718bc460ab 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -25,6 +25,8 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" ) +var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType)) + const testHistogramResponse = `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"prometheus_http_request_duration_seconds","handler":"/metrics","instance":"localhost:9090","job":"prometheus"},"histogram":[1719528871.898,{"count":"6342","sum":"43.31319875499995","buckets":[[0,"0.0013810679320049755","0.0015060652591874421","1"],[0,"0.0015060652591874421","0.001642375811042411","7"],[0,"0.001642375811042411","0.0017910235218841233","5"],[0,"0.0017910235218841233","0.001953125","13"],[0,"0.001953125","0.0021298979153618314","19"],[0,"0.0021298979153618314","0.0023226701464896895","13"],[0,"0.0023226701464896895","0.002532889755177753","13"],[0,"0.002532889755177753","0.002762135864009951","15"],[0,"0.002762135864009951","0.0030121305183748843","12"],[0,"0.0030121305183748843","0.003284751622084822","34"],[0,"0.003284751622084822","0.0035820470437682465","188"],[0,"0.0035820470437682465","0.00390625","372"],[0,"0.00390625","0.004259795830723663","400"],[0,"0.004259795830723663","0.004645340292979379","411"],[0,"0.004645340292979379","0.005065779510355506","425"],[0,"0.005065779510355506","0.005524271728019902","425"],[0,"0.005524271728019902","0.0060242610367497685","521"],[0,"0.0060242610367497685","0.006569503244169644","621"],[0,"0.006569503244169644","0.007164094087536493","593"],[0,"0.007164094087536493","0.0078125","506"],[0,"0.0078125","0.008519591661447326","458"],[0,"0.008519591661447326","0.009290680585958758","346"],[0,"0.009290680585958758","0.010131559020711013","285"],[0,"0.010131559020711013","0.011048543456039804","196"],[0,"0.011048543456039804","0.012048522073499537","129"],[0,"0.012048522073499537","0.013139006488339287","85"],[0,"0.013139006488339287","0.014328188175072986","65"],[0,"0.014328188175072986","0.015625","54"],[0,"0.015625","0.01703918332289465","53"],[0,"0.01703918332289465","0.018581361171917516","20"],[0,"0.018581361171917516","0.020263118041422026","21"],[0,"0.020263118041422026","0.022097086912079608","15"],[0,"0.022097086912079608","0.024097044146999074","11"],[0,"0.024097044146999074","0.026278012976678575","2"],[0,"0.026278012976678575","0.028656376350145972","3"],[0,"0.028656376350145972","0.03125","3"],[0,"0.04052623608284405","0.044194173824159216","2"]]}]}]}}` func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeader) { @@ -35,7 +37,7 @@ func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeade func TestRequest(t *testing.T) { t.Parallel() - codec := InstantQueryCodec + codec := testInstantQueryCodec for _, tc := range []struct { url string @@ -182,7 +184,7 @@ func TestCompressedResponse(t *testing.T) { Header: h, Body: io.NopCloser(responseBody), } - resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil) require.Equal(t, tc.err, err) if err == nil { @@ -376,7 +378,7 @@ func TestResponse(t *testing.T) { } } - resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil) require.NoError(t, err) // Reset response, as the above call will have consumed the body reader. @@ -386,7 +388,7 @@ func TestResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := InstantQueryCodec.EncodeResponse(context.Background(), resp) + resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), resp) require.NoError(t, err) assert.Equal(t, response, resp2) }) @@ -645,7 +647,7 @@ func TestMergeResponse(t *testing.T) { if tc.cancelBeforeDecode { cancelCtx() } - dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil) + dr, err := testInstantQueryCodec.DecodeResponse(ctx, hr, nil) assert.Equal(t, tc.expectedDecodeErr, err) if err != nil { cancelCtx() @@ -657,13 +659,13 @@ func TestMergeResponse(t *testing.T) { if tc.cancelBeforeMerge { cancelCtx() } - resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...) + resp, err := testInstantQueryCodec.MergeResponse(ctx, tc.req, resps...) assert.Equal(t, tc.expectedErr, err) if err != nil { cancelCtx() return } - dr, err := InstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -1660,7 +1662,7 @@ func TestMergeResponseProtobuf(t *testing.T) { if tc.cancelBeforeDecode { cancelCtx() } - dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil) + dr, err := testInstantQueryCodec.DecodeResponse(ctx, hr, nil) assert.Equal(t, tc.expectedDecodeErr, err) if err != nil { cancelCtx() @@ -1672,13 +1674,13 @@ func TestMergeResponseProtobuf(t *testing.T) { if tc.cancelBeforeMerge { cancelCtx() } - resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...) + resp, err := testInstantQueryCodec.MergeResponse(ctx, tc.req, resps...) assert.Equal(t, tc.expectedErr, err) if err != nil { cancelCtx() return } - dr, err := InstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -1743,7 +1745,7 @@ func Benchmark_Decode(b *testing.B) { StatusCode: 200, Body: io.NopCloser(bytes.NewBuffer(body)), } - _, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + _, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil) require.NoError(b, err) } }) @@ -1806,7 +1808,7 @@ func Benchmark_Decode_Protobuf(b *testing.B) { Header: http.Header{"Content-Type": []string{"application/x-protobuf"}}, Body: io.NopCloser(bytes.NewBuffer(body)), } - _, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + _, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil) require.NoError(b, err) } }) diff --git a/pkg/querier/tripperware/instantquery/shard_by_query_test.go b/pkg/querier/tripperware/instantquery/shard_by_query_test.go index aac85b2a9a..bbfa5fc4ae 100644 --- a/pkg/querier/tripperware/instantquery/shard_by_query_test.go +++ b/pkg/querier/tripperware/instantquery/shard_by_query_test.go @@ -9,5 +9,5 @@ import ( func Test_shardQuery(t *testing.T) { t.Parallel() - tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf")) + tripperware.TestQueryShardQuery(t, testInstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf")) }