From 31f2cce4c7e20a623d6a79fa5d583f7fc77a6534 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 16 Dec 2024 10:41:11 +0900 Subject: [PATCH] Allow ruler to retrieve proto format query response Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 6 + docs/configuration/v1-guarantees.md | 4 +- go.mod | 2 +- integration/ruler_test.go | 88 +++++--- pkg/querier/codec/protobuf_codec.go | 4 +- pkg/querier/codec/protobuf_codec_test.go | 8 +- .../tripperware/instantquery/instant_query.go | 50 ++++- .../instantquery/instant_query_test.go | 63 +++++- pkg/querier/tripperware/query.go | 15 +- .../tripperware/queryrange/marshaling_test.go | 2 +- .../tripperware/queryrange/query_range.go | 2 +- .../queryrange/query_range_test.go | 4 +- .../queryrange/split_by_interval_test.go | 2 +- pkg/querier/tripperware/roundtrip.go | 16 +- pkg/querier/tripperware/roundtrip_test.go | 2 +- pkg/ruler/frontend_client.go | 48 ++++- pkg/ruler/frontend_client_pool.go | 4 +- pkg/ruler/frontend_client_test.go | 196 +++++++++++++++++- pkg/ruler/frontend_decoder.go | 86 +++++++- pkg/ruler/frontend_decoder_test.go | 173 +++++++++++++++- pkg/ruler/ruler.go | 21 +- 22 files changed, 708 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8559e53924..97d3fea1ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [CHANGE] Change all max async concurrency default values `50` to `3` #6268 * [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265 * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 +* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345 * [FEATURE] Ruler: Pagination support for List Rules API. #6299 * [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d5c06fc205..b0e40ad130 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4313,6 +4313,12 @@ The `ruler_config` configures the Cortex ruler. # CLI flag: -ruler.frontend-address [frontend_address: | default = ""] +# [Experimental] Query response format to get query results from Query Frontend +# when the rule evaluation. It will only take effect when +# `-ruler.frontend-address` is configured. Supported values: json,protobuf +# CLI flag: -ruler.query-response-format +[query_response_format: | default = "protobuf"] + frontend_client: # gRPC client max receive message size (bytes). # CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 94da14fb10..2b46407986 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -35,7 +35,9 @@ Cortex is an actively developed project and we want to encourage the introductio Currently experimental features are: -- Ruler: Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address` ) +- Ruler + - Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address`). + - When `-ruler.frontend-address` is specified, the response format can be specified (via `-ruler.query-response-format`). - S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides). - Azure blob storage. - Zone awareness based replication. diff --git a/go.mod b/go.mod index dcab040906..7bd8012269 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 github.com/google/go-cmp v0.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.2 go.opentelemetry.io/collector/pdata v1.21.0 @@ -189,7 +190,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/ncw/swift v1.0.53 // indirect github.com/oklog/run v1.1.0 // indirect diff --git a/integration/ruler_test.go b/integration/ruler_test.go index f7d16507d1..d9e7362b5f 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -1670,42 +1670,64 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) { distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") require.NoError(t, s.StartAndWaitReady(distributor, ingester)) - queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") - require.NoError(t, s.Start(queryFrontend)) - - ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(), - }), "") - querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ - "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), - }), "") - require.NoError(t, s.StartAndWaitReady(ruler, querier)) - - c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user) - require.NoError(t, err) + for _, format := range []string{"protobuf", "json"} { + t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) { + queryFrontendFlag := mergeFlags(flags, map[string]string{ + "-ruler.query-response-format": format, + }) + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "") + require.NoError(t, s.Start(queryFrontend)) - expression := "metric" - groupName := "rule_group" - ruleName := "rule_name" - require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace)) + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) - rgMatcher := ruleGroupMatcher(user, namespace, groupName) - // Wait until ruler has loaded the group. - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) - // Wait until rule group has tried to evaluate the rule. - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{ + "-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }) + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "") + require.NoError(t, s.StartAndWaitReady(ruler)) - matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) - // Check that cortex_ruler_query_frontend_clients went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics)) - // Check that cortex_ruler_queries_total went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_queries_failed_total is zero - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_write_requests_total went up - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) - // Check that cortex_ruler_write_requests_failed_total is zero - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + t.Cleanup(func() { + _ = s.Stop(ruler) + _ = s.Stop(queryFrontend) + _ = s.Stop(querier) + }) + + c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + expression := "metric" // vector + //expression := "scalar(count(up == 1)) > bool 1" // scalar + groupName := "rule_group" + ruleName := "rule_name" + require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace)) + + rgMatcher := ruleGroupMatcher(user, namespace, groupName) + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + // Wait until rule group has tried to evaluate the rule. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + // Make sure not to fail + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + + matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) + sourceMatcher := labels.MustNewMatcher(labels.MatchEqual, "source", "ruler") + // Check that cortex_ruler_query_frontend_clients went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_query_frontend_queries_total went up + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics)) + }) + } } func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule { diff --git a/pkg/querier/codec/protobuf_codec.go b/pkg/querier/codec/protobuf_codec.go index 2544499b9c..783adcd183 100644 --- a/pkg/querier/codec/protobuf_codec.go +++ b/pkg/querier/codec/protobuf_codec.go @@ -25,8 +25,8 @@ func (p ProtobufCodec) ContentType() v1.MIMEType { if !p.CortexInternal { return v1.MIMEType{Type: "application", SubType: "x-protobuf"} } - // TODO: switch to use constants. - return v1.MIMEType{Type: "application", SubType: "x-cortex-query+proto"} + + return v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType} } func (p ProtobufCodec) CanEncode(resp *v1.Response) bool { diff --git a/pkg/querier/codec/protobuf_codec_test.go b/pkg/querier/codec/protobuf_codec_test.go index 310e49b392..00b2b4bf60 100644 --- a/pkg/querier/codec/protobuf_codec_test.go +++ b/pkg/querier/codec/protobuf_codec_test.go @@ -42,6 +42,7 @@ func TestProtobufCodec_Encode(t *testing.T) { expected *tripperware.PrometheusResponse }{ { + name: "vector", data: &v1.QueryData{ ResultType: parser.ValueTypeVector, Result: promql.Vector{ @@ -85,6 +86,7 @@ func TestProtobufCodec_Encode(t *testing.T) { }, }, { + name: "scalar", data: &v1.QueryData{ ResultType: parser.ValueTypeScalar, Result: promql.Scalar{T: 1000, V: 1}, @@ -147,6 +149,7 @@ func TestProtobufCodec_Encode(t *testing.T) { }, }, { + name: "matrix", data: &v1.QueryData{ ResultType: parser.ValueTypeMatrix, Result: promql.Matrix{ @@ -180,6 +183,7 @@ func TestProtobufCodec_Encode(t *testing.T) { }, }, { + name: "matrix with multiple series", data: &v1.QueryData{ ResultType: parser.ValueTypeMatrix, Result: promql.Matrix{ @@ -223,6 +227,7 @@ func TestProtobufCodec_Encode(t *testing.T) { }, }, { + name: "matrix: not cortex internal with histogram", data: &v1.QueryData{ ResultType: parser.ValueTypeMatrix, Result: promql.Matrix{ @@ -312,6 +317,7 @@ func TestProtobufCodec_Encode(t *testing.T) { }, }, { + name: "vector: not cortex internal with histogram", data: &v1.QueryData{ ResultType: parser.ValueTypeVector, Result: promql.Vector{ @@ -400,7 +406,7 @@ func TestProtobufCodec_Encode(t *testing.T) { }, }, { - name: "cortex internal with native histogram", + name: "vector: cortex internal with native histogram", cortexInternal: true, data: &v1.QueryData{ ResultType: parser.ValueTypeVector, diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 75c9715d39..54016c7746 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -11,9 +11,11 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/munnerz/goautoneg" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" + v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/status" @@ -29,6 +31,9 @@ var ( SortMapKeys: true, ValidateJsonRawMessage: false, }.Froze() + + rulerMIMEType = v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType} + jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"} ) type instantQueryCodec struct { @@ -68,12 +73,18 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for result.Stats = r.FormValue("stats") result.Path = r.URL.Path - // Include the specified headers from http request in prometheusRequest. - for _, header := range forwardHeaders { - for h, hv := range r.Header { - if strings.EqualFold(h, header) { - result.Headers[h] = hv - break + isSourceRuler := strings.Contains(r.Header.Get("User-Agent"), tripperware.RulerUserAgent) + if isSourceRuler { + // When the source is the Ruler, then forward whole headers + result.Headers = r.Header + } else { + // Include the specified headers from http request in prometheusRequest. + for _, header := range forwardHeaders { + for h, hv := range r.Header { + if strings.EqualFold(h, header) { + result.Headers[h] = hv + break + } } } } @@ -155,7 +166,11 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ } } - tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) + isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent) + if !isSourceRuler { + // When the source is the Ruler, skip set header + tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) + } req := &http.Request{ Method: "GET", @@ -168,7 +183,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ return req.WithContext(ctx), nil } -func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) { +func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() @@ -180,7 +195,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res queryStats := stats.FromContext(ctx) tripperware.SetQueryResponseStats(a, queryStats) - b, err := json.Marshal(a) + contentType, b, err := marshalResponse(a, req.Header.Get("Accept")) if err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err) } @@ -189,7 +204,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res resp := http.Response{ Header: http.Header{ - "Content-Type": []string{tripperware.ApplicationJson}, + "Content-Type": []string{contentType}, }, Body: io.NopCloser(bytes.NewBuffer(b)), StatusCode: http.StatusOK, @@ -217,3 +232,18 @@ func decorateWithParamName(err error, field string) error { } return fmt.Errorf(errTmpl, field, err) } + +func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) { + for _, clause := range goautoneg.ParseAccept(acceptHeader) { + if jsonMIMEType.Satisfies(clause) { + b, err := json.Marshal(resp) + return tripperware.ApplicationJson, b, err + } else if rulerMIMEType.Satisfies(clause) { + b, err := resp.Marshal() + return tripperware.QueryResponseCortexMIMEType, b, err + } + } + + b, err := json.Marshal(resp) + return tripperware.ApplicationJson, b, err +} diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 8d8ed8fee7..1ce16c3956 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -27,6 +27,12 @@ import ( var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType)) +var jsonHttpReq = &http.Request{ + Header: map[string][]string{ + "Accept": {"application/json"}, + }, +} + const testHistogramResponse = `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"prometheus_http_request_duration_seconds","handler":"/metrics","instance":"localhost:9090","job":"prometheus"},"histogram":[1719528871.898,{"count":"6342","sum":"43.31319875499995","buckets":[[0,"0.0013810679320049755","0.0015060652591874421","1"],[0,"0.0015060652591874421","0.001642375811042411","7"],[0,"0.001642375811042411","0.0017910235218841233","5"],[0,"0.0017910235218841233","0.001953125","13"],[0,"0.001953125","0.0021298979153618314","19"],[0,"0.0021298979153618314","0.0023226701464896895","13"],[0,"0.0023226701464896895","0.002532889755177753","13"],[0,"0.002532889755177753","0.002762135864009951","15"],[0,"0.002762135864009951","0.0030121305183748843","12"],[0,"0.0030121305183748843","0.003284751622084822","34"],[0,"0.003284751622084822","0.0035820470437682465","188"],[0,"0.0035820470437682465","0.00390625","372"],[0,"0.00390625","0.004259795830723663","400"],[0,"0.004259795830723663","0.004645340292979379","411"],[0,"0.004645340292979379","0.005065779510355506","425"],[0,"0.005065779510355506","0.005524271728019902","425"],[0,"0.005524271728019902","0.0060242610367497685","521"],[0,"0.0060242610367497685","0.006569503244169644","621"],[0,"0.006569503244169644","0.007164094087536493","593"],[0,"0.007164094087536493","0.0078125","506"],[0,"0.0078125","0.008519591661447326","458"],[0,"0.008519591661447326","0.009290680585958758","346"],[0,"0.009290680585958758","0.010131559020711013","285"],[0,"0.010131559020711013","0.011048543456039804","196"],[0,"0.011048543456039804","0.012048522073499537","129"],[0,"0.012048522073499537","0.013139006488339287","85"],[0,"0.013139006488339287","0.014328188175072986","65"],[0,"0.014328188175072986","0.015625","54"],[0,"0.015625","0.01703918332289465","53"],[0,"0.01703918332289465","0.018581361171917516","20"],[0,"0.018581361171917516","0.020263118041422026","21"],[0,"0.020263118041422026","0.022097086912079608","15"],[0,"0.022097086912079608","0.024097044146999074","11"],[0,"0.024097044146999074","0.026278012976678575","2"],[0,"0.026278012976678575","0.028656376350145972","3"],[0,"0.028656376350145972","0.03125","3"],[0,"0.04052623608284405","0.044194173824159216","2"]]}]}]}}` func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeader) { @@ -394,7 +400,7 @@ func TestResponse(t *testing.T) { promBody: &tripperware.PrometheusResponse{ Status: "success", Data: tripperware.PrometheusData{ - ResultType: model.ValString.String(), + ResultType: model.ValScalar.String(), Result: tripperware.PrometheusQueryResult{ Result: &tripperware.PrometheusQueryResult_RawBytes{ RawBytes: []byte(`{"resultType":"scalar","result":[1,"13"]}`), @@ -458,7 +464,7 @@ func TestResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), resp) + resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), jsonHttpReq, resp) require.NoError(t, err) assert.Equal(t, response, resp2) }) @@ -735,7 +741,7 @@ func TestMergeResponse(t *testing.T) { cancelCtx() return } - dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -1750,7 +1756,7 @@ func TestMergeResponseProtobuf(t *testing.T) { cancelCtx() return } - dr, err := testInstantQueryCodec.EncodeResponse(ctx, resp) + dr, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp) assert.Equal(t, tc.expectedErr, err) contents, err := io.ReadAll(dr.Body) assert.Equal(t, tc.expectedErr, err) @@ -1760,6 +1766,55 @@ func TestMergeResponseProtobuf(t *testing.T) { } } +func Test_marshalResponseContentType(t *testing.T) { + mockResp := &tripperware.PrometheusResponse{} + + tests := []struct { + name string + acceptHeader string + expectedContentType string + }{ + { + name: "empty accept header", + acceptHeader: "", + expectedContentType: "application/json", + }, + { + name: "type and subtype are *", + acceptHeader: "*/*", + expectedContentType: "application/json", + }, + { + name: "sub type is *", + acceptHeader: "application/*", + expectedContentType: "application/json", + }, + { + name: "json type", + acceptHeader: "application/json", + expectedContentType: "application/json", + }, + { + name: "proto type", + acceptHeader: "application/x-protobuf", + expectedContentType: "application/json", + }, + { + name: "cortex type, json type", + acceptHeader: "application/x-cortex-query+proto, application/json", + expectedContentType: "application/x-cortex-query+proto", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + contentType, _, err := marshalResponse(mockResp, test.acceptHeader) + require.NoError(t, err) + require.Equal(t, test.expectedContentType, contentType) + }) + } +} + func Benchmark_Decode(b *testing.B) { maxSamplesCount := 1000000 samples := make([]tripperware.SampleStream, maxSamplesCount) diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 28e7c2430b..97d7ef602e 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -12,10 +12,9 @@ import ( "time" "unsafe" - "github.com/golang/snappy" - "github.com/go-kit/log" "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -48,6 +47,13 @@ const ( ProtobufCodecType CodecType = "protobuf" ApplicationProtobuf string = "application/x-protobuf" ApplicationJson string = "application/json" + + QueryResponseCortexMIMEType = "application/" + QueryResponseCortexMIMESubType + QueryResponseCortexMIMESubType = "x-cortex-query+proto" + RulerUserAgent = "CortexRuler" + + SourceRuler = "ruler" + SourceAPI = "api" ) // Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares. @@ -62,7 +68,7 @@ type Codec interface { // EncodeRequest encodes a Request into an http request. EncodeRequest(context.Context, Request) (*http.Request, error) // EncodeResponse encodes a Response into an http response. - EncodeResponse(context.Context, Response) (*http.Response, error) + EncodeResponse(context.Context, *http.Request, Response) (*http.Response, error) } // Merger is used by middlewares making multiple requests to merge back all responses into a single one. @@ -763,7 +769,8 @@ func SetRequestHeaders(h http.Header, defaultCodecType CodecType, compression Co } func UnmarshalResponse(r *http.Response, buf []byte, resp *PrometheusResponse) error { - if r.Header != nil && r.Header.Get("Content-Type") == ApplicationProtobuf { + contentType := r.Header.Get("Content-Type") + if r.Header != nil && (contentType == ApplicationProtobuf || contentType == QueryResponseCortexMIMEType) { return proto.Unmarshal(buf, resp) } else { return json.Unmarshal(buf, resp) diff --git a/pkg/querier/tripperware/queryrange/marshaling_test.go b/pkg/querier/tripperware/queryrange/marshaling_test.go index 4652a0e10b..d0efd0e8d4 100644 --- a/pkg/querier/tripperware/queryrange/marshaling_test.go +++ b/pkg/querier/tripperware/queryrange/marshaling_test.go @@ -80,7 +80,7 @@ func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - _, err := PrometheusCodec.EncodeResponse(context.Background(), res) + _, err := PrometheusCodec.EncodeResponse(context.Background(), nil, res) require.NoError(b, err) } } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 875519cd97..1693408ea9 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -230,7 +230,7 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return &resp, nil } -func (prometheusCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) { +func (prometheusCodec) EncodeResponse(ctx context.Context, _ *http.Request, res tripperware.Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 256d2800ff..780077274d 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -307,7 +307,7 @@ func TestResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) + resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp) require.NoError(t, err) assert.Equal(t, response, resp2) cancelCtx() @@ -431,7 +431,7 @@ func TestResponseWithStats(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))), ContentLength: int64(len(tc.jsonBody)), } - resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp) + resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp) require.NoError(t, err) assert.Equal(t, response, resp2) }) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index e9cfd8526f..d96070a569 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -276,7 +276,7 @@ func TestSplitByDay(t *testing.T) { mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), nil, parsedResponse, parsedResponse) require.NoError(t, err) - mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse) + mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), nil, mergedResponse) require.NoError(t, err) mergedHTTPResponseBody, err := io.ReadAll(mergedHTTPResponse.Body) diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 69b46dd66b..1237758698 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -115,7 +115,7 @@ func NewQueryTripperware( queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_frontend_queries_total", Help: "Total queries sent per tenant.", - }, []string{"op", "user"}) + }, []string{"op", "user", "source"}) rejectedQueriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_frontend_rejected_queries_total", @@ -156,7 +156,8 @@ func NewQueryTripperware( now := time.Now() userStr := tenant.JoinTenantIDs(tenantIDs) activeUsers.UpdateUserTimestamp(userStr, now) - queriesPerTenant.WithLabelValues(op, userStr).Inc() + source := getSource(r.Header.Get("User-Agent")) + queriesPerTenant.WithLabelValues(op, userStr, source).Inc() if maxSubQuerySteps > 0 && (isQuery || isQueryRange) { query := r.FormValue("query") @@ -211,7 +212,7 @@ func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { return nil, err } - return q.codec.EncodeResponse(r.Context(), response) + return q.codec.EncodeResponse(r.Context(), r, response) } // Do implements Handler. @@ -240,3 +241,12 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) { return q.codec.DecodeResponse(ctx, response, r) } + +func getSource(userAgent string) string { + if strings.Contains(userAgent, RulerUserAgent) { + // caller is ruler + return SourceRuler + } + + return SourceAPI +} diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 263c9a9b0e..40ac06598c 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -59,7 +59,7 @@ func (c mockCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) return mockRequest{}, nil } -func (c mockCodec) EncodeResponse(_ context.Context, resp Response) (*http.Response, error) { +func (c mockCodec) EncodeResponse(_ context.Context, _ *http.Request, resp Response) (*http.Response, error) { r := resp.(*mockResponse) return &http.Response{ Header: http.Header{ diff --git a/pkg/ruler/frontend_client.go b/pkg/ruler/frontend_client.go index e75997ce1a..dff0fca11c 100644 --- a/pkg/ruler/frontend_client.go +++ b/pkg/ruler/frontend_client.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) @@ -22,22 +23,29 @@ const ( orgIDHeader = "X-Scope-OrgID" instantQueryPath = "/api/v1/query" mimeTypeForm = "application/x-www-form-urlencoded" - contentTypeJSON = "application/json" ) +var jsonDecoder JsonDecoder +var protobufDecoder ProtobufDecoder + type FrontendClient struct { client httpgrpc.HTTPClient timeout time.Duration prometheusHTTPPrefix string - jsonDecoder JsonDecoder + queryResponseFormat string + decoders map[string]Decoder } -func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix string) *FrontendClient { +func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix, queryResponseFormat string) *FrontendClient { return &FrontendClient{ client: client, timeout: timeout, prometheusHTTPPrefix: prometheusHTTPPrefix, - jsonDecoder: JsonDecoder{}, + queryResponseFormat: queryResponseFormat, + decoders: map[string]Decoder{ + jsonDecoder.ContentType(): jsonDecoder, + protobufDecoder.ContentType(): protobufDecoder, + }, } } @@ -55,15 +63,23 @@ func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Tim return nil, err } + acceptHeader := "" + switch p.queryResponseFormat { + case queryResponseFormatJson: + acceptHeader = jsonDecoder.ContentType() + case queryResponseFormatProtobuf: + acceptHeader = fmt.Sprintf("%s,%s", protobufDecoder.ContentType(), jsonDecoder.ContentType()) + } + req := &httpgrpc.HTTPRequest{ Method: http.MethodPost, Url: p.prometheusHTTPPrefix + instantQueryPath, Body: body, Headers: []*httpgrpc.Header{ - {Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{fmt.Sprintf("Cortex/%s", version.Version)}}, + {Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{fmt.Sprintf("%s/%s", tripperware.RulerUserAgent, version.Version)}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeForm}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}}, - {Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{contentTypeJSON}}, + {Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{acceptHeader}}, {Key: textproto.CanonicalMIMEHeaderKey(orgIDHeader), Values: []string{orgID}}, }, } @@ -91,7 +107,15 @@ func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Tim return nil, err } - vector, warning, err := p.jsonDecoder.Decode(resp.Body) + contentType := extractHeader(resp.Headers, "Content-Type") + decoder, ok := p.decoders[contentType] + if !ok { + err = fmt.Errorf("unknown content type: %s", contentType) + level.Error(log).Log("err", err, "query", qs) + return nil, err + } + + vector, warning, err := decoder.Decode(resp.Body) if err != nil { level.Error(log).Log("err", err, "query", qs) return nil, err @@ -103,3 +127,13 @@ func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Tim return vector, nil } + +func extractHeader(headers []*httpgrpc.Header, target string) string { + for _, h := range headers { + if h.Key == target && len(h.Values) > 0 { + return h.Values[0] + } + } + + return "" +} diff --git a/pkg/ruler/frontend_client_pool.go b/pkg/ruler/frontend_client_pool.go index b9b512cfab..7b131621aa 100644 --- a/pkg/ruler/frontend_client_pool.go +++ b/pkg/ruler/frontend_client_pool.go @@ -20,6 +20,7 @@ import ( type frontendPool struct { timeout time.Duration + queryResponseFormat string prometheusHTTPPrefix string grpcConfig grpcclient.Config @@ -29,6 +30,7 @@ type frontendPool struct { func newFrontendPool(cfg Config, log log.Logger, reg prometheus.Registerer) *client.Pool { p := &frontendPool{ timeout: cfg.FrontendTimeout, + queryResponseFormat: cfg.QueryResponseFormat, prometheusHTTPPrefix: cfg.PrometheusHTTPPrefix, grpcConfig: cfg.GRPCClientConfig, frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -68,7 +70,7 @@ func (f *frontendPool) createFrontendClient(addr string) (client.PoolClient, err } return &frontendClient{ - FrontendClient: NewFrontendClient(httpgrpc.NewHTTPClient(conn), f.timeout, f.prometheusHTTPPrefix), + FrontendClient: NewFrontendClient(httpgrpc.NewHTTPClient(conn), f.timeout, f.prometheusHTTPPrefix, f.queryResponseFormat), HealthClient: grpc_health_v1.NewHealthClient(conn), }, nil } diff --git a/pkg/ruler/frontend_client_test.go b/pkg/ruler/frontend_client_test.go index 104780f58c..d46df39738 100644 --- a/pkg/ruler/frontend_client_test.go +++ b/pkg/ruler/frontend_client_test.go @@ -13,6 +13,9 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "google.golang.org/grpc" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" ) type mockHTTPGRPCClient func(ctx context.Context, req *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) @@ -28,7 +31,7 @@ func TestTimeout(t *testing.T) { } ctx := context.Background() ctx = user.InjectOrgID(ctx, "userID") - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") _, err := frontendClient.InstantQuery(ctx, "query", time.Now()) require.Equal(t, context.DeadlineExceeded, err) } @@ -37,12 +40,12 @@ func TestNoOrgId(t *testing.T) { mockClientFn := func(ctx context.Context, _ *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { return nil, nil } - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") _, err := frontendClient.InstantQuery(context.Background(), "query", time.Now()) require.Equal(t, user.ErrNoOrgID, err) } -func TestInstantQuery(t *testing.T) { +func TestInstantQueryJsonCodec(t *testing.T) { tests := []struct { description string responseBody string @@ -148,10 +151,195 @@ func TestInstantQuery(t *testing.T) { } ctx := context.Background() ctx = user.InjectOrgID(ctx, "userID") - frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "json") + vector, err := frontendClient.InstantQuery(ctx, "query", time.Now()) + require.Equal(t, test.expected, vector) + require.Equal(t, test.expectedErr, err) + }) + } +} + +func TestInstantQueryProtoCodec(t *testing.T) { + var tests = []struct { + description string + responseBody *tripperware.PrometheusResponse + expected promql.Vector + expectedErr error + }{ + { + description: "empty vector", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{}, + }, + }, + }, + }, + }, + expected: promql.Vector{}, + expectedErr: nil, + }, + { + description: "vector with series", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar")), + Sample: &cortexpb.Sample{ + Value: 1.234, + TimestampMs: 1724146338123, + }, + }, + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("bar", "baz")), + Sample: &cortexpb.Sample{ + Value: 5.678, + TimestampMs: 1724146338456, + }, + }, + }, + }, + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.FromStrings("foo", "bar"), + T: 1724146338123, + F: 1.234, + }, + { + Metric: labels.FromStrings("bar", "baz"), + T: 1724146338456, + F: 5.678, + }, + }, + expectedErr: nil, + }, + { + description: "get scalar", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "scalar", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"scalar","result":[1724146338.123,"1.234"]}`), + }, + }, + }, + }, + expected: promql.Vector{ + { + Metric: labels.EmptyLabels(), + T: 1724146338123, + F: 1.234, + }, + }, + expectedErr: nil, + }, + { + description: "get matrix", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "matrix", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{ + Matrix: &tripperware.Matrix{ + SampleStreams: []tripperware.SampleStream{}, + }, + }, + }, + }, + }, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + { + description: "get string", + responseBody: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "string", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1724146338.123,"string"]}`), + }, + }, + }, + }, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + } + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + mockClientFn := func(ctx context.Context, _ *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + d, err := test.responseBody.Marshal() + if err != nil { + return nil, err + } + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: []*httpgrpc.Header{ + {Key: "Content-Type", Values: []string{"application/x-cortex-query+proto"}}, + }, + Body: d, + }, nil + } + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "userID") + frontendClient := NewFrontendClient(mockHTTPGRPCClient(mockClientFn), time.Second*5, "/prometheus", "protobuf") vector, err := frontendClient.InstantQuery(ctx, "query", time.Now()) require.Equal(t, test.expected, vector) require.Equal(t, test.expectedErr, err) }) } } + +func Test_extractHeader(t *testing.T) { + tests := []struct { + description string + headers []*httpgrpc.Header + expectedOutput string + }{ + { + description: "cortex query proto", + headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"application/x-cortex-query+proto"}, + }, + }, + expectedOutput: "application/x-cortex-query+proto", + }, + { + description: "json", + headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"application/json"}, + }, + }, + expectedOutput: "application/json", + }, + } + + target := "Content-Type" + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + require.Equal(t, test.expectedOutput, extractHeader(test.headers, target)) + }) + } +} diff --git a/pkg/ruler/frontend_decoder.go b/pkg/ruler/frontend_decoder.go index c8e653a05f..3ea23875b2 100644 --- a/pkg/ruler/frontend_decoder.go +++ b/pkg/ruler/frontend_decoder.go @@ -10,6 +10,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util/api" ) @@ -18,9 +20,19 @@ const ( ) type JsonDecoder struct{} -type Warning []string +type ProtobufDecoder struct{} +type Warnings []string -func (j JsonDecoder) Decode(body []byte) (promql.Vector, Warning, error) { +type Decoder interface { + Decode(body []byte) (promql.Vector, Warnings, error) + ContentType() string +} + +func (j JsonDecoder) ContentType() string { + return "application/json" +} + +func (j JsonDecoder) Decode(body []byte) (promql.Vector, Warnings, error) { var response api.Response if err := json.NewDecoder(bytes.NewReader(body)).Decode(&response); err != nil { @@ -48,7 +60,7 @@ func (j JsonDecoder) Decode(body []byte) (promql.Vector, Warning, error) { if err := json.Unmarshal(data.Result, &scalar); err != nil { return nil, nil, err } - return j.scalarToPromQLVector(scalar), response.Warnings, nil + return scalarToPromQLVector(scalar), response.Warnings, nil case model.ValVector: var vector model.Vector if err := json.Unmarshal(data.Result, &vector); err != nil { @@ -79,10 +91,72 @@ func (j JsonDecoder) vectorToPromQLVector(vector model.Vector) promql.Vector { return v } -func (j JsonDecoder) scalarToPromQLVector(scalar model.Scalar) promql.Vector { +func (p ProtobufDecoder) ContentType() string { + return tripperware.QueryResponseCortexMIMEType +} + +func (p ProtobufDecoder) Decode(body []byte) (promql.Vector, Warnings, error) { + resp := tripperware.PrometheusResponse{} + if err := resp.Unmarshal(body); err != nil { + return nil, nil, err + } + + if resp.Status == statusError { + return nil, resp.Warnings, fmt.Errorf("failed to execute query with error: %s", resp.Error) + } + + switch resp.Data.ResultType { + case "scalar": + data := struct { + Type model.ValueType `json:"resultType"` + Result json.RawMessage `json:"result"` + }{} + + if err := json.Unmarshal(resp.Data.Result.GetRawBytes(), &data); err != nil { + return nil, nil, err + } + + var s model.Scalar + if err := json.Unmarshal(data.Result, &s); err != nil { + return nil, nil, err + } + return scalarToPromQLVector(s), resp.Warnings, nil + case "vector": + return p.vectorToPromQLVector(resp.Data.Result.GetVector()), resp.Warnings, nil + default: + return nil, resp.Warnings, errors.New("rule result is not a vector or scalar") + } +} + +func (p ProtobufDecoder) vectorToPromQLVector(vector *tripperware.Vector) promql.Vector { + v := make([]promql.Sample, 0, len(vector.Samples)) + for _, sample := range vector.Samples { + metric := cortexpb.FromLabelAdaptersToLabels(sample.Labels) + + if sample.Sample != nil { + v = append(v, promql.Sample{ + T: sample.Sample.TimestampMs, + F: sample.Sample.Value, + Metric: metric, + }) + } + + if sample.RawHistogram != nil { + v = append(v, promql.Sample{ + T: sample.RawHistogram.TimestampMs, + H: cortexpb.FloatHistogramProtoToFloatHistogram(*sample.RawHistogram), + Metric: metric, + }) + } + } + + return v +} + +func scalarToPromQLVector(s model.Scalar) promql.Vector { return promql.Vector{promql.Sample{ - T: int64(scalar.Timestamp), - F: float64(scalar.Value), + T: int64(s.Timestamp), + F: float64(s.Value), Metric: labels.Labels{}, }} } diff --git a/pkg/ruler/frontend_decoder_test.go b/pkg/ruler/frontend_decoder_test.go index cc24fe732b..4cabe47859 100644 --- a/pkg/ruler/frontend_decoder_test.go +++ b/pkg/ruler/frontend_decoder_test.go @@ -4,19 +4,186 @@ import ( "errors" "testing" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" ) -func TestDecode(t *testing.T) { - jsonDecoder := JsonDecoder{} +func TestProtoDecode(t *testing.T) { + tests := []struct { + description string + resp *tripperware.PrometheusResponse + expectedVector promql.Vector + expectedWarning []string + expectedErr error + }{ + { + description: "vector", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar")), + Sample: &cortexpb.Sample{ + Value: 1.234, + TimestampMs: 1724146338123, + }, + }, + }, + }, + }, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: promql.Vector{ + { + Metric: labels.FromStrings("foo", "bar"), + T: 1724146338123, + F: 1.234, + }, + }, + expectedWarning: []string{"a", "b", "c"}, + }, + { + description: "vector with raw histogram", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "vector", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Vector{ + Vector: &tripperware.Vector{ + Samples: []tripperware.Sample{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar")), + RawHistogram: &cortexpb.Histogram{ + Count: &cortexpb.Histogram_CountFloat{CountFloat: 10}, + Sum: 20, + Schema: 2, + ZeroThreshold: 0.001, + ZeroCount: &cortexpb.Histogram_ZeroCountFloat{ZeroCountFloat: 12}, + NegativeSpans: []cortexpb.BucketSpan{{Offset: 2, Length: 2}}, + NegativeCounts: []float64{2, 1}, + PositiveSpans: []cortexpb.BucketSpan{{Offset: 3, Length: 2}, {Offset: 1, Length: 3}}, + PositiveCounts: []float64{1, 2, 2, 1, 1}, + TimestampMs: 1724146338123, + }, + }, + }, + }, + }, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: promql.Vector{ + { + Metric: labels.FromStrings("foo", "bar"), + T: 1724146338123, + H: &histogram.FloatHistogram{ + Schema: 2, + ZeroThreshold: 0.001, + ZeroCount: 12, + Count: 10, + Sum: 20, + PositiveSpans: []histogram.Span{{Offset: 3, Length: 2}, {Offset: 1, Length: 3}}, + NegativeSpans: []histogram.Span{{Offset: 2, Length: 2}}, + PositiveBuckets: []float64{1, 2, 2, 1, 1}, + NegativeBuckets: []float64{2, 1}, + CustomValues: nil, + }, + }, + }, + expectedWarning: []string{"a", "b", "c"}, + }, + { + description: "matrix", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "matrix", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_Matrix{}, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: nil, + expectedWarning: []string{"a", "b", "c"}, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + { + description: "scalar", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "scalar", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"scalar","result":[1724146338.123,"1.234"]}`), + }, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: promql.Vector{ + { + Metric: labels.EmptyLabels(), + T: 1724146338123, + F: 1.234, + }, + }, + expectedWarning: []string{"a", "b", "c"}, + }, + { + description: "string", + resp: &tripperware.PrometheusResponse{ + Status: "success", + Data: tripperware.PrometheusData{ + ResultType: "string", + Result: tripperware.PrometheusQueryResult{ + Result: &tripperware.PrometheusQueryResult_RawBytes{ + RawBytes: []byte(`{"resultType":"string","result":[1724146338.123,"1.234"]}`), + }, + }, + }, + Warnings: []string{"a", "b", "c"}, + }, + expectedVector: nil, + expectedWarning: []string{"a", "b", "c"}, + expectedErr: errors.New("rule result is not a vector or scalar"), + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + b, err := test.resp.Marshal() + require.NoError(t, err) + + vector, _, err := protobufDecoder.Decode(b) + require.Equal(t, test.expectedErr, err) + require.Equal(t, test.expectedVector, vector) + require.Equal(t, test.expectedWarning, test.resp.Warnings) + }) + } +} +func TestJsonDecode(t *testing.T) { tests := []struct { description string body string expectedVector promql.Vector - expectedWarning Warning + expectedWarning Warnings expectedErr error }{ { diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index d77b4d0a41..812fd4f15c 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -47,10 +47,13 @@ import ( var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + supportedQueryResponseFormats = []string{queryResponseFormatJson, queryResponseFormatProtobuf} + // Validation errors. - errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0") + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0") + errInvalidQueryResponseFormat = errors.New("invalid query response format") ) const ( @@ -82,6 +85,10 @@ const ( unknownHealthFilter string = "unknown" okHealthFilter string = "ok" errHealthFilter string = "err" + + // query response formats + queryResponseFormatJson = "json" + queryResponseFormatProtobuf = "protobuf" ) type DisabledRuleGroupErr struct { @@ -96,6 +103,9 @@ func (e *DisabledRuleGroupErr) Error() string { type Config struct { // This is used for query to query frontend to evaluate rules FrontendAddress string `yaml:"frontend_address"` + // Query response format of query frontend for evaluating rules + // It will only take effect FrontendAddress is configured. + QueryResponseFormat string `yaml:"query_response_format"` // HTTP timeout duration when querying to query frontend to evaluate rules FrontendTimeout time.Duration `yaml:"-"` // Query frontend GRPC Client configuration. @@ -185,6 +195,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error { if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 { return errInvalidMaxConcurrentEvals } + + if !util.StringsContain(supportedQueryResponseFormats, cfg.QueryResponseFormat) { + return errInvalidQueryResponseFormat + } return nil } @@ -207,6 +221,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger) f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "[Experimental] GRPC listen address of the Query Frontend, in host:port format. If set, Ruler queries to Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.") + f.StringVar(&cfg.QueryResponseFormat, "ruler.query-response-format", queryResponseFormatProtobuf, fmt.Sprintf("[Experimental] Query response format to get query results from Query Frontend when the rule evaluation. It will only take effect when `-ruler.frontend-address` is configured. Supported values: %s", strings.Join(supportedQueryResponseFormats, ","))) cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 1*time.Minute, "How frequently to evaluate rules")