Skip to content

Commit

Permalink
Stop using global instant query codec (#6328)
Browse files Browse the repository at this point in the history
* stop using global instant query codec

Signed-off-by: Ben Ye <[email protected]>

* update

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Nov 10, 2024
1 parent 6761646 commit 1d09628
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
5 changes: 3 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
Expand All @@ -490,7 +491,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
return nil, err
}

instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
if err != nil {
return nil, err
}
Expand All @@ -501,7 +502,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
queryRangeMiddlewares,
instantQueryMiddlewares,
prometheusCodec,
instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec),
instantQueryCodec,
t.Overrides,
queryAnalyzer,
t.Cfg.Querier.DefaultEvaluationInterval,
Expand Down
2 changes: 0 additions & 2 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
)

var (
InstantQueryCodec tripperware.Codec = NewInstantQueryCodec("", "protobuf")

json = jsoniter.Config{
EscapeHTML: false, // No HTML in our responses.
SortMapKeys: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
func Middlewares(
log log.Logger,
limits tripperware.Limits,
merger tripperware.Merger,
queryAnalyzer querysharding.Analyzer,
lookbackDelta time.Duration,
) ([]tripperware.Middleware, error) {
m := []tripperware.Middleware{
NewLimitsMiddleware(limits, lookbackDelta),
tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer),
tripperware.ShardByMiddleware(log, limits, merger, queryAnalyzer),
}
return m, nil
}
26 changes: 14 additions & 12 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)

var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType))

const testHistogramResponse = `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"prometheus_http_request_duration_seconds","handler":"/metrics","instance":"localhost:9090","job":"prometheus"},"histogram":[1719528871.898,{"count":"6342","sum":"43.31319875499995","buckets":[[0,"0.0013810679320049755","0.0015060652591874421","1"],[0,"0.0015060652591874421","0.001642375811042411","7"],[0,"0.001642375811042411","0.0017910235218841233","5"],[0,"0.0017910235218841233","0.001953125","13"],[0,"0.001953125","0.0021298979153618314","19"],[0,"0.0021298979153618314","0.0023226701464896895","13"],[0,"0.0023226701464896895","0.002532889755177753","13"],[0,"0.002532889755177753","0.002762135864009951","15"],[0,"0.002762135864009951","0.0030121305183748843","12"],[0,"0.0030121305183748843","0.003284751622084822","34"],[0,"0.003284751622084822","0.0035820470437682465","188"],[0,"0.0035820470437682465","0.00390625","372"],[0,"0.00390625","0.004259795830723663","400"],[0,"0.004259795830723663","0.004645340292979379","411"],[0,"0.004645340292979379","0.005065779510355506","425"],[0,"0.005065779510355506","0.005524271728019902","425"],[0,"0.005524271728019902","0.0060242610367497685","521"],[0,"0.0060242610367497685","0.006569503244169644","621"],[0,"0.006569503244169644","0.007164094087536493","593"],[0,"0.007164094087536493","0.0078125","506"],[0,"0.0078125","0.008519591661447326","458"],[0,"0.008519591661447326","0.009290680585958758","346"],[0,"0.009290680585958758","0.010131559020711013","285"],[0,"0.010131559020711013","0.011048543456039804","196"],[0,"0.011048543456039804","0.012048522073499537","129"],[0,"0.012048522073499537","0.013139006488339287","85"],[0,"0.013139006488339287","0.014328188175072986","65"],[0,"0.014328188175072986","0.015625","54"],[0,"0.015625","0.01703918332289465","53"],[0,"0.01703918332289465","0.018581361171917516","20"],[0,"0.018581361171917516","0.020263118041422026","21"],[0,"0.020263118041422026","0.022097086912079608","15"],[0,"0.022097086912079608","0.024097044146999074","11"],[0,"0.024097044146999074","0.026278012976678575","2"],[0,"0.026278012976678575","0.028656376350145972","3"],[0,"0.028656376350145972","0.03125","3"],[0,"0.04052623608284405","0.044194173824159216","2"]]}]}]}}`

func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeader) {
Expand All @@ -35,7 +37,7 @@ func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeade

func TestRequest(t *testing.T) {
t.Parallel()
codec := InstantQueryCodec
codec := testInstantQueryCodec

for _, tc := range []struct {
url string
Expand Down Expand Up @@ -182,7 +184,7 @@ func TestCompressedResponse(t *testing.T) {
Header: h,
Body: io.NopCloser(responseBody),
}
resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.Equal(t, tc.err, err)

if err == nil {
Expand Down Expand Up @@ -376,7 +378,7 @@ func TestResponse(t *testing.T) {
}
}

resp, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.NoError(t, err)

// Reset response, as the above call will have consumed the body reader.
Expand All @@ -386,7 +388,7 @@ func TestResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))),
ContentLength: int64(len(tc.jsonBody)),
}
resp2, err := InstantQueryCodec.EncodeResponse(context.Background(), resp)
resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
})
Expand Down Expand Up @@ -645,7 +647,7 @@ func TestMergeResponse(t *testing.T) {
if tc.cancelBeforeDecode {
cancelCtx()
}
dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil)
dr, err := testInstantQueryCodec.DecodeResponse(ctx, hr, nil)
assert.Equal(t, tc.expectedDecodeErr, err)
if err != nil {
cancelCtx()
Expand All @@ -657,13 +659,13 @@ func TestMergeResponse(t *testing.T) {
if tc.cancelBeforeMerge {
cancelCtx()
}
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
resp, err := testInstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
assert.Equal(t, tc.expectedErr, err)
if err != nil {
cancelCtx()
return
}
dr, err := InstantQueryCodec.EncodeResponse(ctx, resp)
dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp)
assert.Equal(t, tc.expectedErr, err)
contents, err := io.ReadAll(dr.Body)
assert.Equal(t, tc.expectedErr, err)
Expand Down Expand Up @@ -1660,7 +1662,7 @@ func TestMergeResponseProtobuf(t *testing.T) {
if tc.cancelBeforeDecode {
cancelCtx()
}
dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil)
dr, err := testInstantQueryCodec.DecodeResponse(ctx, hr, nil)
assert.Equal(t, tc.expectedDecodeErr, err)
if err != nil {
cancelCtx()
Expand All @@ -1672,13 +1674,13 @@ func TestMergeResponseProtobuf(t *testing.T) {
if tc.cancelBeforeMerge {
cancelCtx()
}
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
resp, err := testInstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
assert.Equal(t, tc.expectedErr, err)
if err != nil {
cancelCtx()
return
}
dr, err := InstantQueryCodec.EncodeResponse(ctx, resp)
dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp)
assert.Equal(t, tc.expectedErr, err)
contents, err := io.ReadAll(dr.Body)
assert.Equal(t, tc.expectedErr, err)
Expand Down Expand Up @@ -1743,7 +1745,7 @@ func Benchmark_Decode(b *testing.B) {
StatusCode: 200,
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.NoError(b, err)
}
})
Expand Down Expand Up @@ -1806,7 +1808,7 @@ func Benchmark_Decode_Protobuf(b *testing.B) {
Header: http.Header{"Content-Type": []string{"application/x-protobuf"}},
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.NoError(b, err)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf"))
tripperware.TestQueryShardQuery(t, testInstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf"))
}

0 comments on commit 1d09628

Please sign in to comment.