diff --git a/pkg/frontend/frontend_test.go b/pkg/frontend/frontend_test.go index 6a983c4c756..e460d7124ac 100644 --- a/pkg/frontend/frontend_test.go +++ b/pkg/frontend/frontend_test.go @@ -218,7 +218,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand if l != nil { logger = l } - codec := querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json") + codec := querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil) var workerConfig querier_worker.Config flagext.DefaultValues(&workerConfig) diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 70224131ecb..e1e5863380b 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -49,7 +49,8 @@ var ( allFormats = []string{formatJSON, formatProtobuf} // List of HTTP headers to propagate when a Prometheus request is encoded into a HTTP request. - prometheusCodecPropagateHeaders = []string{compat.ForceFallbackHeaderName, chunkinfologger.ChunkInfoLoggingHeader, api.ReadConsistencyOffsetsHeader} + prometheusCodecPropagateHeadersMetrics = []string{compat.ForceFallbackHeaderName, chunkinfologger.ChunkInfoLoggingHeader, api.ReadConsistencyOffsetsHeader} + prometheusCodecPropagateHeadersLabels = []string{} ) const ( @@ -77,16 +78,22 @@ type Codec interface { DecodeMetricsQueryRequest(context.Context, *http.Request) (MetricsQueryRequest, error) // DecodeLabelsQueryRequest decodes a LabelsQueryRequest from an http request. DecodeLabelsQueryRequest(context.Context, *http.Request) (LabelsQueryRequest, error) - // DecodeResponse decodes a Response from an http response. + // DecodeMetricsQueryResponse decodes a Response from an http response. // The original request is also passed as a parameter this is useful for implementation that needs the request // to merge result or build the result correctly. - DecodeResponse(context.Context, *http.Response, MetricsQueryRequest, log.Logger) (Response, error) + DecodeMetricsQueryResponse(context.Context, *http.Response, MetricsQueryRequest, log.Logger) (Response, error) + // DecodeLabelsQueryResponse decodes a Response from an http response. + // The original request is also passed as a parameter this is useful for implementation that needs the request + // to merge result or build the result correctly. + DecodeLabelsQueryResponse(context.Context, *http.Response, LabelsQueryRequest, log.Logger) (Response, error) // EncodeMetricsQueryRequest encodes a MetricsQueryRequest into an http request. EncodeMetricsQueryRequest(context.Context, MetricsQueryRequest) (*http.Request, error) // EncodeLabelsQueryRequest encodes a LabelsQueryRequest into an http request. EncodeLabelsQueryRequest(context.Context, LabelsQueryRequest) (*http.Request, error) - // EncodeResponse encodes a Response into an http response. - EncodeResponse(context.Context, *http.Request, Response) (*http.Response, error) + // EncodeMetricsQueryResponse encodes a Response from a MetricsQueryRequest into an http response. + EncodeMetricsQueryResponse(context.Context, *http.Request, Response) (*http.Response, error) + // EncodeLabelsQueryResponse encodes a Response from a LabelsQueryRequest into an http response. + EncodeLabelsQueryResponse(context.Context, *http.Request, Response, bool) (*http.Response, error) } // Merger is used by middlewares making multiple requests to merge back all responses into a single one. @@ -166,6 +173,14 @@ type LabelsQueryRequest interface { GetLabelMatcherSets() []string // GetLimit returns the limit of the number of items in the response. GetLimit() uint64 + // GetHeaders returns the HTTP headers in the request. + GetHeaders() []*PrometheusHeader + // WithLabelName clones the current request with a different label name param. + WithLabelName(string) (LabelsQueryRequest, error) + // WithLabelMatcherSets clones the current request with different label matchers. + WithLabelMatcherSets([]string) (LabelsQueryRequest, error) + // WithHeaders clones the current request with different headers. + WithHeaders([]*PrometheusHeader) (LabelsQueryRequest, error) // AddSpanTags writes information about this request to an OpenTracing span AddSpanTags(opentracing.Span) } @@ -204,14 +219,19 @@ func newPrometheusCodecMetrics(registerer prometheus.Registerer) *prometheusCode } type prometheusCodec struct { - metrics *prometheusCodecMetrics - lookbackDelta time.Duration - preferredQueryResultResponseFormat string + metrics *prometheusCodecMetrics + lookbackDelta time.Duration + preferredQueryResultResponseFormat string + propagateHeadersMetrics, propagateHeadersLabels []string } type formatter interface { - EncodeResponse(resp *PrometheusResponse) ([]byte, error) - DecodeResponse([]byte) (*PrometheusResponse, error) + EncodeQueryResponse(resp *PrometheusResponse) ([]byte, error) + EncodeLabelsResponse(resp *PrometheusLabelsResponse) ([]byte, error) + EncodeSeriesResponse(resp *PrometheusSeriesResponse) ([]byte, error) + DecodeQueryResponse([]byte) (*PrometheusResponse, error) + DecodeLabelsResponse([]byte) (*PrometheusLabelsResponse, error) + DecodeSeriesResponse([]byte) (*PrometheusSeriesResponse, error) Name() string ContentType() v1.MIMEType } @@ -227,11 +247,14 @@ func NewPrometheusCodec( registerer prometheus.Registerer, lookbackDelta time.Duration, queryResultResponseFormat string, + propagateHeaders []string, ) Codec { return prometheusCodec{ metrics: newPrometheusCodecMetrics(registerer), lookbackDelta: lookbackDelta, preferredQueryResultResponseFormat: queryResultResponseFormat, + propagateHeadersMetrics: append(prometheusCodecPropagateHeadersMetrics, propagateHeaders...), + propagateHeadersLabels: append(prometheusCodecPropagateHeadersLabels, propagateHeaders...), } } @@ -319,7 +342,7 @@ func (c prometheusCodec) decodeRangeQueryRequest(r *http.Request) (MetricsQueryR query := reqValues.Get("query") queryExpr, err := parser.ParseExpr(query) if err != nil { - return nil, decorateWithParamName(err, "query") + return nil, DecorateWithParamName(err, "query") } var options Options @@ -345,13 +368,13 @@ func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (MetricsQuer time, err := DecodeInstantQueryTimeParams(&reqValues, time.Now) if err != nil { - return nil, decorateWithParamName(err, "time") + return nil, DecorateWithParamName(err, "time") } query := reqValues.Get("query") queryExpr, err := parser.ParseExpr(query) if err != nil { - return nil, decorateWithParamName(err, "query") + return nil, DecorateWithParamName(err, "query") } var options Options @@ -364,7 +387,7 @@ func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (MetricsQuer } func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Request) (LabelsQueryRequest, error) { - if !IsLabelsQuery(r.URL.Path) { + if !IsLabelsQuery(r.URL.Path) && !IsSeriesQuery(r.URL.Path) { return nil, fmt.Errorf("unknown labels query API endpoint %s", r.URL.Path) } @@ -387,6 +410,15 @@ func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Reque } } + if IsSeriesQuery(r.URL.Path) { + return &PrometheusSeriesQueryRequest{ + Path: r.URL.Path, + Start: start, + End: end, + LabelMatcherSets: labelMatcherSets, + Limit: limit, + }, nil + } if IsLabelNamesQuery(r.URL.Path) { return &PrometheusLabelNamesQueryRequest{ Path: r.URL.Path, @@ -412,12 +444,12 @@ func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Reque func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64, err error) { start, err = util.ParseTime(reqValues.Get("start")) if err != nil { - return 0, 0, 0, decorateWithParamName(err, "start") + return 0, 0, 0, DecorateWithParamName(err, "start") } end, err = util.ParseTime(reqValues.Get("end")) if err != nil { - return 0, 0, 0, decorateWithParamName(err, "end") + return 0, 0, 0, DecorateWithParamName(err, "end") } if end < start { @@ -426,7 +458,7 @@ func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64, step, err = parseDurationMs(reqValues.Get("step")) if err != nil { - return 0, 0, 0, decorateWithParamName(err, "step") + return 0, 0, 0, DecorateWithParamName(err, "step") } if step <= 0 { @@ -451,7 +483,7 @@ func DecodeInstantQueryTimeParams(reqValues *url.Values, defaultNow func() time. } else { time, err = util.ParseTime(timeVal) if err != nil { - return 0, decorateWithParamName(err, "time") + return 0, DecorateWithParamName(err, "time") } } @@ -476,7 +508,7 @@ func DecodeLabelsQueryTimeParams(reqValues *url.Values, usePromDefaults bool) (s } else { start, err = util.ParseTime(startVal) if err != nil { - return 0, 0, decorateWithParamName(err, "start") + return 0, 0, DecorateWithParamName(err, "start") } } @@ -486,7 +518,7 @@ func DecodeLabelsQueryTimeParams(reqValues *url.Values, usePromDefaults bool) (s } else { end, err = util.ParseTime(endVal) if err != nil { - return 0, 0, decorateWithParamName(err, "end") + return 0, 0, DecorateWithParamName(err, "end") } } @@ -598,7 +630,7 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric // Propagate allowed HTTP headers. for _, h := range r.GetHeaders() { - if !slices.Contains(prometheusCodecPropagateHeaders, h.Name) { + if !slices.Contains(c.propagateHeadersMetrics, h.Name) { continue } @@ -652,6 +684,24 @@ func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req Label Path: req.Path, // path still contains label name RawQuery: urlValues.Encode(), } + case *PrometheusSeriesQueryRequest: + urlValues := url.Values{} + if req.GetStart() != 0 { + urlValues["start"] = []string{encodeTime(req.Start)} + } + if req.GetEnd() != 0 { + urlValues["end"] = []string{encodeTime(req.End)} + } + if len(req.GetLabelMatcherSets()) > 0 { + urlValues["match[]"] = req.GetLabelMatcherSets() + } + if req.GetLimit() > 0 { + urlValues["limit"] = []string{strconv.FormatUint(req.GetLimit(), 10)} + } + u = &url.URL{ + Path: req.Path, + RawQuery: urlValues.Encode(), + } default: return nil, fmt.Errorf("unsupported request type %T", req) @@ -678,6 +728,18 @@ func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req Label r.Header.Add(api.ReadConsistencyHeader, level) } + // Propagate allowed HTTP headers. + for _, h := range req.GetHeaders() { + if !slices.Contains(c.propagateHeadersLabels, h.Name) { + continue + } + + for _, v := range h.Values { + // There should only be one value, but add all of them for completeness. + r.Header.Add(h.Name, v) + } + } + return r.WithContext(ctx), nil } @@ -699,7 +761,7 @@ func encodeOptions(req *http.Request, o Options) { } } -func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ MetricsQueryRequest, logger log.Logger) (Response, error) { +func (c prometheusCodec) DecodeMetricsQueryResponse(ctx context.Context, r *http.Response, _ MetricsQueryRequest, logger log.Logger) (Response, error) { spanlog := spanlogger.FromContext(ctx, logger) buf, err := readResponseBody(r) if err != nil { @@ -737,7 +799,7 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ } start := time.Now() - resp, err := formatter.DecodeResponse(buf) + resp, err := formatter.DecodeQueryResponse(buf) if err != nil { return nil, apierror.Newf(apierror.TypeInternal, "error decoding response: %v", err) } @@ -755,6 +817,90 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return resp, nil } +func (c prometheusCodec) DecodeLabelsQueryResponse(ctx context.Context, r *http.Response, lr LabelsQueryRequest, logger log.Logger) (Response, error) { + spanlog := spanlogger.FromContext(ctx, logger) + buf, err := readResponseBody(r) + if err != nil { + return nil, spanlog.Error(err) + } + + spanlog.LogFields(otlog.String("message", "ParseQueryRangeResponse"), + otlog.Int("status_code", r.StatusCode), + otlog.Int("bytes", len(buf))) + + // Before attempting to decode a response based on the content type, check if the + // Content-Type header was even set. When the scheduler returns gRPC errors, they + // are encoded as httpgrpc.HTTPResponse objects with an HTTP status code and the + // error message as the body of the response with no content type. We need to handle + // that case here before we decode well-formed success or error responses. + contentType := r.Header.Get("Content-Type") + if contentType == "" { + switch r.StatusCode { + case http.StatusServiceUnavailable: + return nil, apierror.New(apierror.TypeUnavailable, string(buf)) + case http.StatusTooManyRequests: + return nil, apierror.New(apierror.TypeTooManyRequests, string(buf)) + case http.StatusRequestEntityTooLarge: + return nil, apierror.New(apierror.TypeTooLargeEntry, string(buf)) + default: + if r.StatusCode/100 == 5 { + return nil, apierror.New(apierror.TypeInternal, string(buf)) + } + } + } + + formatter := findFormatter(contentType) + if formatter == nil { + return nil, apierror.Newf(apierror.TypeInternal, "unknown response content type '%v'", contentType) + } + + start := time.Now() + + var response Response + + switch lr.(type) { + case *PrometheusLabelNamesQueryRequest, *PrometheusLabelValuesQueryRequest: + resp, err := formatter.DecodeLabelsResponse(buf) + if err != nil { + return nil, apierror.Newf(apierror.TypeInternal, "error decoding response: %v", err) + } + + c.metrics.duration.WithLabelValues(operationDecode, formatter.Name()).Observe(time.Since(start).Seconds()) + c.metrics.size.WithLabelValues(operationDecode, formatter.Name()).Observe(float64(len(buf))) + + if resp.Status == statusError { + return nil, apierror.New(apierror.Type(resp.ErrorType), resp.Error) + } + + for h, hv := range r.Header { + resp.Headers = append(resp.Headers, &PrometheusHeader{Name: h, Values: hv}) + } + + response = resp + case *PrometheusSeriesQueryRequest: + resp, err := formatter.DecodeSeriesResponse(buf) + if err != nil { + return nil, apierror.Newf(apierror.TypeInternal, "error decoding response: %v", err) + } + + c.metrics.duration.WithLabelValues(operationDecode, formatter.Name()).Observe(time.Since(start).Seconds()) + c.metrics.size.WithLabelValues(operationDecode, formatter.Name()).Observe(float64(len(buf))) + + if resp.Status == statusError { + return nil, apierror.New(apierror.Type(resp.ErrorType), resp.Error) + } + + for h, hv := range r.Header { + resp.Headers = append(resp.Headers, &PrometheusHeader{Name: h, Values: hv}) + } + + response = resp + default: + return nil, apierror.Newf(apierror.TypeInternal, "unsupported request type %T", lr) + } + return response, nil +} + func findFormatter(contentType string) formatter { for _, f := range knownFormats { if f.ContentType().String() == contentType { @@ -765,7 +911,7 @@ func findFormatter(contentType string) formatter { return nil } -func (c prometheusCodec) EncodeResponse(ctx context.Context, req *http.Request, res Response) (*http.Response, error) { +func (c prometheusCodec) EncodeMetricsQueryResponse(ctx context.Context, req *http.Request, res Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() @@ -783,7 +929,7 @@ func (c prometheusCodec) EncodeResponse(ctx context.Context, req *http.Request, } start := time.Now() - b, err := formatter.EncodeResponse(a) + b, err := formatter.EncodeQueryResponse(a) if err != nil { return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err) } @@ -807,6 +953,66 @@ func (c prometheusCodec) EncodeResponse(ctx context.Context, req *http.Request, return &resp, nil } +func (c prometheusCodec) EncodeLabelsQueryResponse(ctx context.Context, req *http.Request, res Response, isSeriesResponse bool) (*http.Response, error) { + sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") + defer sp.Finish() + + selectedContentType, formatter := c.negotiateContentType(req.Header.Get("Accept")) + if formatter == nil { + return nil, apierror.New(apierror.TypeNotAcceptable, "none of the content types in the Accept header are supported") + } + + var start time.Time + var b []byte + + switch isSeriesResponse { + case false: + a, ok := res.(*PrometheusLabelsResponse) + if !ok { + return nil, apierror.Newf(apierror.TypeInternal, "invalid response format") + } + if a.Data != nil { + sp.LogFields(otlog.Int("labels", len(a.Data))) + } + + start = time.Now() + var err error + b, err = formatter.EncodeLabelsResponse(a) + if err != nil { + return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err) + } + case true: + a, ok := res.(*PrometheusSeriesResponse) + if !ok { + return nil, apierror.Newf(apierror.TypeInternal, "invalid response format") + } + if a.Data != nil { + sp.LogFields(otlog.Int("labels", len(a.Data))) + } + + start = time.Now() + var err error + b, err = formatter.EncodeSeriesResponse(a) + if err != nil { + return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err) + } + } + + c.metrics.duration.WithLabelValues(operationEncode, formatter.Name()).Observe(time.Since(start).Seconds()) + c.metrics.size.WithLabelValues(operationEncode, formatter.Name()).Observe(float64(len(b))) + sp.LogFields(otlog.Int("bytes", len(b))) + + resp := http.Response{ + Header: http.Header{ + "Content-Type": []string{selectedContentType}, + }, + Body: io.NopCloser(bytes.NewBuffer(b)), + StatusCode: http.StatusOK, + ContentLength: int64(len(b)), + } + return &resp, nil +} + func (prometheusCodec) negotiateContentType(acceptHeader string) (string, formatter) { if acceptHeader == "" { return jsonMimeType, jsonFormatterInstance @@ -967,7 +1173,7 @@ func encodeDurationMs(d int64) string { return strconv.FormatFloat(float64(d)/float64(time.Second/time.Millisecond), 'f', -1, 64) } -func decorateWithParamName(err error, field string) error { +func DecorateWithParamName(err error, field string) error { errTmpl := "invalid parameter %q: %v" if status, ok := grpcutil.ErrorToStatus(err); ok { return apierror.Newf(apierror.TypeBadData, errTmpl, field, status.Message()) diff --git a/pkg/frontend/querymiddleware/codec_json.go b/pkg/frontend/querymiddleware/codec_json.go index 5b484474f16..4a2c38ae739 100644 --- a/pkg/frontend/querymiddleware/codec_json.go +++ b/pkg/frontend/querymiddleware/codec_json.go @@ -5,17 +5,19 @@ package querymiddleware -import v1 "github.com/prometheus/prometheus/web/api/v1" +import ( + v1 "github.com/prometheus/prometheus/web/api/v1" +) const jsonMimeType = "application/json" type jsonFormatter struct{} -func (j jsonFormatter) EncodeResponse(resp *PrometheusResponse) ([]byte, error) { +func (j jsonFormatter) EncodeQueryResponse(resp *PrometheusResponse) ([]byte, error) { return json.Marshal(resp) } -func (j jsonFormatter) DecodeResponse(buf []byte) (*PrometheusResponse, error) { +func (j jsonFormatter) DecodeQueryResponse(buf []byte) (*PrometheusResponse, error) { var resp PrometheusResponse if err := json.Unmarshal(buf, &resp); err != nil { @@ -25,6 +27,34 @@ func (j jsonFormatter) DecodeResponse(buf []byte) (*PrometheusResponse, error) { return &resp, nil } +func (j jsonFormatter) EncodeLabelsResponse(resp *PrometheusLabelsResponse) ([]byte, error) { + return json.Marshal(resp) +} + +func (j jsonFormatter) DecodeLabelsResponse(buf []byte) (*PrometheusLabelsResponse, error) { + var resp PrometheusLabelsResponse + + if err := json.Unmarshal(buf, &resp); err != nil { + return nil, err + } + + return &resp, nil +} + +func (j jsonFormatter) EncodeSeriesResponse(resp *PrometheusSeriesResponse) ([]byte, error) { + return json.Marshal(resp) +} + +func (j jsonFormatter) DecodeSeriesResponse(buf []byte) (*PrometheusSeriesResponse, error) { + var resp PrometheusSeriesResponse + + if err := json.Unmarshal(buf, &resp); err != nil { + return nil, err + } + + return &resp, nil +} + func (j jsonFormatter) Name() string { return formatJSON } diff --git a/pkg/frontend/querymiddleware/codec_json_test.go b/pkg/frontend/querymiddleware/codec_json_test.go index 1c681b87c09..a8201d39b78 100644 --- a/pkg/frontend/querymiddleware/codec_json_test.go +++ b/pkg/frontend/querymiddleware/codec_json_test.go @@ -25,7 +25,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" ) -func TestPrometheusCodec_JSONResponse(t *testing.T) { +func TestPrometheusCodec_JSONResponse_Metrics(t *testing.T) { headers := http.Header{"Content-Type": []string{"application/json"}} expectedRespHeaders := []*PrometheusHeader{ { @@ -165,7 +165,7 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil) body, err := json.Marshal(tc.resp) require.NoError(t, err) @@ -175,7 +175,7 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer(body)), ContentLength: int64(len(body)), } - decoded, err := codec.DecodeResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) + decoded, err := codec.DecodeMetricsQueryResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) if err != nil || tc.expectedErr != nil { require.Equal(t, tc.expectedErr, err) return @@ -206,7 +206,7 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer(body)), ContentLength: int64(len(body)), } - encoded, err := codec.EncodeResponse(context.Background(), httpRequest, decoded) + encoded, err := codec.EncodeMetricsQueryResponse(context.Background(), httpRequest, decoded) require.NoError(t, err) expectedJSON, err := readResponseBody(httpResponse) @@ -231,7 +231,118 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) { } } -func TestPrometheusCodec_JSONEncoding(t *testing.T) { +func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) { + headers := http.Header{"Content-Type": []string{"application/json"}} + expectedRespHeaders := []*PrometheusHeader{ + { + Name: "Content-Type", + Values: []string{"application/json"}, + }, + } + + for _, tc := range []struct { + name string + request LabelsQueryRequest + isSeriesResponse bool + responseHeaders http.Header + resp prometheusAPIResponse + expected Response + expectedErr error + }{ + { + name: "successful labels response", + request: &PrometheusLabelNamesQueryRequest{}, + isSeriesResponse: false, + resp: prometheusAPIResponse{ + Status: statusSuccess, + Data: []string{"foo", "bar"}, + }, + expected: &PrometheusLabelsResponse{ + Status: statusSuccess, + Data: []string{"foo", "bar"}, + Headers: expectedRespHeaders, + }, + }, + { + name: "successful series response", + request: &PrometheusSeriesQueryRequest{}, + isSeriesResponse: true, + resp: prometheusAPIResponse{ + Status: statusSuccess, + Data: []SeriesData{ + { + "__name__": "series_1", + "foo": "bar", + }, + { + "__name__": "hist_series_1", + "hoo": "hbar", + }, + }, + }, + expected: &PrometheusSeriesResponse{ + Status: statusSuccess, + Data: []SeriesData{ + { + "__name__": "series_1", + "foo": "bar", + }, + { + "__name__": "hist_series_1", + "hoo": "hbar", + }, + }, + Headers: expectedRespHeaders, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil) + + body, err := json.Marshal(tc.resp) + require.NoError(t, err) + httpResponse := &http.Response{ + StatusCode: 200, + Header: headers, + Body: io.NopCloser(bytes.NewBuffer(body)), + ContentLength: int64(len(body)), + } + decoded, err := codec.DecodeLabelsQueryResponse(context.Background(), httpResponse, tc.request, log.NewNopLogger()) + if err != nil || tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expected, decoded) + + httpRequest := &http.Request{ + Header: http.Header{"Accept": []string{jsonMimeType}}, + } + + // Reset response, as the above call will have consumed the body reader. + httpResponse = &http.Response{ + StatusCode: 200, + Header: headers, + Body: io.NopCloser(bytes.NewBuffer(body)), + ContentLength: int64(len(body)), + } + encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, decoded, tc.isSeriesResponse) + require.NoError(t, err) + + expectedJSON, err := readResponseBody(httpResponse) + require.NoError(t, err) + encodedJSON, err := readResponseBody(encoded) + require.NoError(t, err) + + require.JSONEq(t, string(expectedJSON), string(encodedJSON)) + require.Equal(t, httpResponse, encoded) + }) + } +} + +func TestPrometheusCodec_JSONEncoding_Metrics(t *testing.T) { responseHistogram := mimirpb.FloatHistogram{ CounterResetHint: histogram.GaugeType, Schema: 3, @@ -353,12 +464,12 @@ func TestPrometheusCodec_JSONEncoding(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil) httpRequest := &http.Request{ Header: http.Header{"Accept": []string{jsonMimeType}}, } - encoded, err := codec.EncodeResponse(context.Background(), httpRequest, tc.response) + encoded, err := codec.EncodeMetricsQueryResponse(context.Background(), httpRequest, tc.response) require.NoError(t, err) require.Equal(t, http.StatusOK, encoded.StatusCode) require.Equal(t, "application/json", encoded.Header.Get("Content-Type")) @@ -381,3 +492,77 @@ func TestPrometheusCodec_JSONEncoding(t *testing.T) { }) } } + +func TestPrometheusCodec_JSONEncoding_Labels(t *testing.T) { + for _, tc := range []struct { + name string + expectedJSON string + response Response + isSeriesResponse bool + }{ + { + name: "successful labels response", + response: &PrometheusLabelsResponse{ + Status: statusSuccess, + Data: []string{ + "foo", + "bar", + }, + }, + expectedJSON: ` + { + "status": "success", + "data": ["foo", "bar"] + } + `, + isSeriesResponse: false, + }, + { + name: "successful series response", + response: &PrometheusSeriesResponse{ + Status: statusSuccess, + Data: []SeriesData{ + { + "__name__": "series_1", + "foo": "bar", + }, + { + "__name__": "hist_series_1", + "hoo": "hbar", + }, + }, + }, + expectedJSON: ` + { + "status": "success", + "data": [{ + "__name__": "series_1", + "foo": "bar" + }, { + "__name__": "hist_series_1", + "hoo": "hbar" + }] + } + `, + isSeriesResponse: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil) + httpRequest := &http.Request{ + Header: http.Header{"Accept": []string{jsonMimeType}}, + } + + encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, tc.response, tc.isSeriesResponse) + require.NoError(t, err) + require.Equal(t, http.StatusOK, encoded.StatusCode) + require.Equal(t, "application/json", encoded.Header.Get("Content-Type")) + + encodedJSON, err := readResponseBody(encoded) + require.NoError(t, err) + require.JSONEq(t, tc.expectedJSON, string(encodedJSON)) + require.Equal(t, len(encodedJSON), int(encoded.ContentLength)) + }) + } +} diff --git a/pkg/frontend/querymiddleware/codec_protobuf.go b/pkg/frontend/querymiddleware/codec_protobuf.go index ab1fea61ccc..f984f6cef27 100644 --- a/pkg/frontend/querymiddleware/codec_protobuf.go +++ b/pkg/frontend/querymiddleware/codec_protobuf.go @@ -22,7 +22,7 @@ func (f protobufFormatter) ContentType() v1.MIMEType { return v1.MIMEType{Type: mimirpb.QueryResponseMimeTypeType, SubType: mimirpb.QueryResponseMimeTypeSubType} } -func (f protobufFormatter) EncodeResponse(resp *PrometheusResponse) ([]byte, error) { +func (f protobufFormatter) EncodeQueryResponse(resp *PrometheusResponse) ([]byte, error) { status, err := mimirpb.StatusFromPrometheusString(resp.Status) if err != nil { return nil, err @@ -186,7 +186,7 @@ func (protobufFormatter) encodeMatrixData(data []SampleStream) mimirpb.MatrixDat return mimirpb.MatrixData{Series: series} } -func (f protobufFormatter) DecodeResponse(buf []byte) (*PrometheusResponse, error) { +func (f protobufFormatter) DecodeQueryResponse(buf []byte) (*PrometheusResponse, error) { var resp mimirpb.QueryResponse if err := resp.Unmarshal(buf); err != nil { @@ -326,6 +326,22 @@ func (f protobufFormatter) decodeMatrixData(data *mimirpb.MatrixData) (*Promethe }, nil } +func (f protobufFormatter) EncodeLabelsResponse(*PrometheusLabelsResponse) ([]byte, error) { + return nil, errors.New("protobuf labels encoding is not supported") +} + +func (f protobufFormatter) DecodeLabelsResponse([]byte) (*PrometheusLabelsResponse, error) { + return nil, errors.New("protobuf labels decoding is not supported") +} + +func (f protobufFormatter) EncodeSeriesResponse(*PrometheusSeriesResponse) ([]byte, error) { + return nil, errors.New("protobuf series encoding is not supported") +} + +func (f protobufFormatter) DecodeSeriesResponse([]byte) (*PrometheusSeriesResponse, error) { + return nil, errors.New("protobuf series decoding is not supported") +} + func labelsFromStringArray(s []string) ([]mimirpb.LabelAdapter, error) { if len(s)%2 != 0 { return nil, fmt.Errorf("metric is malformed: expected even number of symbols, but got %v", len(s)) diff --git a/pkg/frontend/querymiddleware/codec_protobuf_test.go b/pkg/frontend/querymiddleware/codec_protobuf_test.go index f064a79a1b9..16a6108382d 100644 --- a/pkg/frontend/querymiddleware/codec_protobuf_test.go +++ b/pkg/frontend/querymiddleware/codec_protobuf_test.go @@ -633,7 +633,7 @@ func TestProtobufFormat_DecodeResponse(t *testing.T) { for _, tc := range protobufCodecScenarios { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf, nil) body, err := tc.payload.Marshal() require.NoError(t, err) @@ -643,7 +643,7 @@ func TestProtobufFormat_DecodeResponse(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer(body)), ContentLength: int64(len(body)), } - decoded, err := codec.DecodeResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) + decoded, err := codec.DecodeMetricsQueryResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) if err != nil || tc.expectedDecodingError != nil { require.Equal(t, tc.expectedDecodingError, err) return @@ -674,7 +674,7 @@ func TestProtobufFormat_EncodeResponse(t *testing.T) { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf, nil) expectedBodyBytes, err := tc.payload.Marshal() require.NoError(t, err) @@ -683,7 +683,7 @@ func TestProtobufFormat_EncodeResponse(t *testing.T) { Header: http.Header{"Accept": []string{mimirpb.QueryResponseMimeType}}, } - httpResponse, err := codec.EncodeResponse(context.Background(), httpRequest, tc.response) + httpResponse, err := codec.EncodeMetricsQueryResponse(context.Background(), httpRequest, tc.response) require.NoError(t, err) require.Equal(t, http.StatusOK, httpResponse.StatusCode) require.Equal(t, mimirpb.QueryResponseMimeType, httpResponse.Header.Get("Content-Type")) @@ -714,7 +714,7 @@ func TestProtobufFormat_EncodeResponse(t *testing.T) { func BenchmarkProtobufFormat_DecodeResponse(b *testing.B) { headers := http.Header{"Content-Type": []string{mimirpb.QueryResponseMimeType}} reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf, nil) for _, tc := range protobufCodecScenarios { body, err := tc.payload.Marshal() @@ -728,7 +728,7 @@ func BenchmarkProtobufFormat_DecodeResponse(b *testing.B) { ContentLength: int64(len(body)), } - _, err = codec.DecodeResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) + _, err = codec.DecodeMetricsQueryResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) if err != nil || tc.expectedDecodingError != nil { require.Equal(b, tc.expectedDecodingError, err) } @@ -739,7 +739,7 @@ func BenchmarkProtobufFormat_DecodeResponse(b *testing.B) { func BenchmarkProtobufFormat_EncodeResponse(b *testing.B) { reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatProtobuf, nil) req := &http.Request{ Header: http.Header{"Accept": []string{mimirpb.QueryResponseMimeType}}, @@ -752,7 +752,7 @@ func BenchmarkProtobufFormat_EncodeResponse(b *testing.B) { b.Run(tc.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - _, err := codec.EncodeResponse(context.Background(), req, tc.response) + _, err := codec.EncodeMetricsQueryResponse(context.Background(), req, tc.response) if err != nil { require.NoError(b, err) diff --git a/pkg/frontend/querymiddleware/codec_test.go b/pkg/frontend/querymiddleware/codec_test.go index 5fbf680b401..39c9a9b4698 100644 --- a/pkg/frontend/querymiddleware/codec_test.go +++ b/pkg/frontend/querymiddleware/codec_test.go @@ -692,7 +692,7 @@ func TestPrometheusCodec_EncodeLabelsQueryRequest(t *testing.T) { func TestPrometheusCodec_EncodeMetricsQueryRequest_AcceptHeader(t *testing.T) { for _, queryResultPayloadFormat := range allFormats { t.Run(queryResultPayloadFormat, func(t *testing.T) { - codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, queryResultPayloadFormat) + codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, queryResultPayloadFormat, nil) req := PrometheusInstantQueryRequest{} encodedRequest, err := codec.EncodeMetricsQueryRequest(context.Background(), &req) require.NoError(t, err) @@ -712,7 +712,7 @@ func TestPrometheusCodec_EncodeMetricsQueryRequest_AcceptHeader(t *testing.T) { func TestPrometheusCodec_EncodeMetricsQueryRequest_ReadConsistency(t *testing.T) { for _, consistencyLevel := range api.ReadConsistencies { t.Run(consistencyLevel, func(t *testing.T) { - codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatProtobuf) + codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatProtobuf, nil) ctx := api.ContextWithReadConsistencyLevel(context.Background(), consistencyLevel) encodedRequest, err := codec.EncodeMetricsQueryRequest(ctx, &PrometheusInstantQueryRequest{}) require.NoError(t, err) @@ -724,7 +724,7 @@ func TestPrometheusCodec_EncodeMetricsQueryRequest_ReadConsistency(t *testing.T) func TestPrometheusCodec_EncodeMetricsQueryRequest_ShouldPropagateHeadersInAllowList(t *testing.T) { const notAllowedHeader = "X-Some-Name" - codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatProtobuf) + codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatProtobuf, nil) expectedOffsets := map[int32]int64{0: 1, 1: 2} req, err := codec.EncodeMetricsQueryRequest(context.Background(), &PrometheusInstantQueryRequest{ @@ -761,10 +761,10 @@ func TestPrometheusCodec_EncodeResponse_ContentNegotiation(t *testing.T) { Error: "something went wrong", } - jsonBody, err := jsonFormatter{}.EncodeResponse(testResponse) + jsonBody, err := jsonFormatter{}.EncodeQueryResponse(testResponse) require.NoError(t, err) - protobufBody, err := protobufFormatter{}.EncodeResponse(testResponse) + protobufBody, err := protobufFormatter{}.EncodeQueryResponse(testResponse) require.NoError(t, err) scenarios := map[string]struct { @@ -816,7 +816,7 @@ func TestPrometheusCodec_EncodeResponse_ContentNegotiation(t *testing.T) { require.NoError(t, err) req.Header.Set("Accept", scenario.acceptHeader) - encodedResponse, err := codec.EncodeResponse(context.Background(), req, testResponse) + encodedResponse, err := codec.EncodeMetricsQueryResponse(context.Background(), req, testResponse) require.Equal(t, scenario.expectedError, err) if scenario.expectedError == nil { @@ -919,7 +919,7 @@ func TestPrometheusCodec_DecodeResponse_Errors(t *testing.T) { t.Run(name, func(t *testing.T) { codec := newTestPrometheusCodec() - _, err := codec.DecodeResponse(context.Background(), testCase.response, nil, testutil.NewTestingLogger(t)) + _, err := codec.DecodeMetricsQueryResponse(context.Background(), testCase.response, nil, testutil.NewTestingLogger(t)) require.Error(t, err) require.True(t, apierror.IsAPIError(err)) resp, ok := apierror.HTTPResponseFromError(err) @@ -948,7 +948,7 @@ func TestPrometheusCodec_DecodeResponse_ContentTypeHandling(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil) resp := prometheusAPIResponse{} body, err := json.Marshal(resp) @@ -960,7 +960,7 @@ func TestPrometheusCodec_DecodeResponse_ContentTypeHandling(t *testing.T) { ContentLength: int64(len(body)), } - _, err = codec.DecodeResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) + _, err = codec.DecodeMetricsQueryResponse(context.Background(), httpResponse, nil, log.NewNopLogger()) require.Equal(t, tc.expectedErr, err) }) } @@ -1441,7 +1441,7 @@ func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - _, err := codec.DecodeResponse(context.Background(), &http.Response{ + _, err := codec.DecodeMetricsQueryResponse(context.Background(), &http.Response{ StatusCode: 200, Body: io.NopCloser(bytes.NewReader(encodedRes)), ContentLength: int64(len(encodedRes)), @@ -1470,7 +1470,7 @@ func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - _, err := codec.EncodeResponse(context.Background(), req, res) + _, err := codec.EncodeMetricsQueryResponse(context.Background(), req, res) require.NoError(b, err) } } @@ -1618,9 +1618,9 @@ func Test_DecodeOptions(t *testing.T) { } } -// TestPrometheusCodec_DecodeEncode tests that decoding and re-encoding a -// request does not lose relevant information about the original request. -func TestPrometheusCodec_DecodeEncode(t *testing.T) { +// TestPrometheusCodec_DecodeEncode_Metrics tests that decoding and re-encoding a +// metrics query request does not lose relevant information about the original request. +func TestPrometheusCodec_DecodeEncode_Metrics(t *testing.T) { codec := newTestPrometheusCodec().(prometheusCodec) for _, tt := range []struct { name string @@ -1678,6 +1678,123 @@ func TestPrometheusCodec_DecodeEncode(t *testing.T) { } } +// TestPrometheusCodec_DecodeEncodeMultipleTimes_Labels tests that decoding and re-encoding a +// labels query request multiple times does not lose relevant information about the original request. +func TestPrometheusCodec_DecodeEncodeMultipleTimes_Labels(t *testing.T) { + codec := newTestPrometheusCodec().(prometheusCodec) + for _, tc := range []struct { + name string + queryURL string + request LabelsQueryRequest + }{ + { + name: "label names - minimal", + queryURL: "/api/v1/labels?end=1708588800&start=1708502400", + request: &PrometheusLabelNamesQueryRequest{ + Path: "/api/v1/labels", + Start: 1708502400000, + End: 1708588800000, + }, + }, + { + name: "label names - all", + queryURL: "/api/v1/labels?end=1708588800&limit=10&match%5B%5D=go_goroutines%7Bcontainer%3D~%22quer.%2A%22%7D&match%5B%5D=go_goroutines%7Bcontainer%21%3D%22query-scheduler%22%7D&start=1708502400", + request: &PrometheusLabelNamesQueryRequest{ + Path: "/api/v1/labels", + Start: 1708502400000, + End: 1708588800000, + LabelMatcherSets: []string{ + "go_goroutines{container=~\"quer.*\"}", + "go_goroutines{container!=\"query-scheduler\"}", + }, + Limit: 10, + }, + }, + { + name: "label values - minimal", + queryURL: "/api/v1/label/job/values?end=1708588800&start=1708502400", + request: &PrometheusLabelValuesQueryRequest{ + Path: "/api/v1/label/job/values", + LabelName: "job", + Start: 1708502400000, + End: 1708588800000, + }, + }, + { + name: "label values - all", + queryURL: "/api/v1/label/job/values?end=1708588800&limit=10&match%5B%5D=go_goroutines%7Bcontainer%3D~%22quer.%2A%22%7D&match%5B%5D=go_goroutines%7Bcontainer%21%3D%22query-scheduler%22%7D&start=1708502400", + request: &PrometheusLabelValuesQueryRequest{ + Path: "/api/v1/label/job/values", + LabelName: "job", + Start: 1708502400000, + End: 1708588800000, + LabelMatcherSets: []string{ + "go_goroutines{container=~\"quer.*\"}", + "go_goroutines{container!=\"query-scheduler\"}", + }, + Limit: 10, + }, + }, + { + name: "series - minimal", + queryURL: "/api/v1/series?end=1708588800&match%5B%5D=go_goroutines%7Bcontainer%21%3D%22query-scheduler%22%7D&start=1708502400", + request: &PrometheusSeriesQueryRequest{ + Path: "/api/v1/series", + Start: 1708502400000, + End: 1708588800000, + LabelMatcherSets: []string{ + "go_goroutines{container!=\"query-scheduler\"}", + }, + }, + }, + { + name: "series - all", + queryURL: "/api/v1/series?end=1708588800&limit=10&match%5B%5D=go_goroutines%7Bcontainer%3D~%22quer.%2A%22%7D&match%5B%5D=go_goroutines%7Bcontainer%21%3D%22query-scheduler%22%7D&start=1708502400", + request: &PrometheusSeriesQueryRequest{ + Path: "/api/v1/series", + Start: 1708502400000, + End: 1708588800000, + LabelMatcherSets: []string{ + "go_goroutines{container=~\"quer.*\"}", + "go_goroutines{container!=\"query-scheduler\"}", + }, + Limit: 10, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + expected, err := http.NewRequest("GET", tc.queryURL, nil) + require.NoError(t, err) + expected.Body = http.NoBody + expected.Header = make(http.Header) + // This header is set by EncodeLabelsQueryRequest according to the codec's config, so we + // should always expect it to be present on the re-encoded request. + expected.Header.Set("Accept", "application/json") + ctx := context.Background() + + decoded, err := codec.DecodeLabelsQueryRequest(ctx, expected) + require.NoError(t, err) + assert.Equal(t, tc.request, decoded) + + encoded, err := codec.EncodeLabelsQueryRequest(ctx, decoded) + require.NoError(t, err) + assert.Equal(t, expected.URL, encoded.URL) + assert.Equal(t, expected.Header, encoded.Header) + + decoded, err = codec.DecodeLabelsQueryRequest(ctx, encoded) + require.NoError(t, err) + assert.Equal(t, tc.request, decoded) + + encoded, err = codec.EncodeLabelsQueryRequest(ctx, decoded) + require.NoError(t, err) + assert.Equal(t, expected.URL, encoded.URL) + assert.Equal(t, expected.Header, encoded.Header) + }) + } +} + func TestPrometheusCodec_DecodeMultipleTimes(t *testing.T) { const query = "sum by (namespace) (container_memory_rss)" t.Run("instant query", func(t *testing.T) { @@ -1731,7 +1848,7 @@ func TestPrometheusCodec_DecodeMultipleTimes(t *testing.T) { } func newTestPrometheusCodec() Codec { - return NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatJSON) + return NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatJSON, nil) } func mustSucceed[T any](value T, err error) T { diff --git a/pkg/frontend/querymiddleware/generic_query_cache_test.go b/pkg/frontend/querymiddleware/generic_query_cache_test.go index 354df9fade5..eae385bad9f 100644 --- a/pkg/frontend/querymiddleware/generic_query_cache_test.go +++ b/pkg/frontend/querymiddleware/generic_query_cache_test.go @@ -226,7 +226,7 @@ func testGenericQueryCacheRoundTrip(t *testing.T, newRoundTripper newGenericQuer initialStoreCallsCount := cacheBackend.CountStoreCalls() reg := prometheus.NewPedanticRegistry() - rt := newRoundTripper(cacheBackend, DefaultCacheKeyGenerator{codec: NewPrometheusCodec(reg, 0*time.Minute, formatJSON)}, limits, downstream, testutil.NewLogger(t), reg) + rt := newRoundTripper(cacheBackend, DefaultCacheKeyGenerator{codec: NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil)}, limits, downstream, testutil.NewLogger(t), reg) res, err := rt.RoundTrip(req) require.NoError(t, err) diff --git a/pkg/frontend/querymiddleware/labels_query_cache_test.go b/pkg/frontend/querymiddleware/labels_query_cache_test.go index 6b59b24cc8c..2b771649765 100644 --- a/pkg/frontend/querymiddleware/labels_query_cache_test.go +++ b/pkg/frontend/querymiddleware/labels_query_cache_test.go @@ -142,7 +142,7 @@ func TestDefaultCacheKeyGenerator_LabelValuesCacheKey(t *testing.T) { } reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil) for testName, testData := range tests { t.Run(testName, func(t *testing.T) { diff --git a/pkg/frontend/querymiddleware/limits.go b/pkg/frontend/querymiddleware/limits.go index 74e1f3a7848..f3c95717058 100644 --- a/pkg/frontend/querymiddleware/limits.go +++ b/pkg/frontend/querymiddleware/limits.go @@ -198,8 +198,8 @@ type limitedParallelismRoundTripper struct { middleware MetricsQueryMiddleware } -// newLimitedParallelismRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`. -func newLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limits Limits, middlewares ...MetricsQueryMiddleware) http.RoundTripper { +// NewLimitedParallelismRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`. +func NewLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limits Limits, middlewares ...MetricsQueryMiddleware) http.RoundTripper { return limitedParallelismRoundTripper{ downstream: roundTripperHandler{ next: next, @@ -248,7 +248,7 @@ func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Respo return nil, err } - return rt.codec.EncodeResponse(ctx, r, response) + return rt.codec.EncodeMetricsQueryResponse(ctx, r, response) } // roundTripperHandler is an adapter that implements the MetricsQueryHandler interface using a http.RoundTripper to perform @@ -276,7 +276,7 @@ func (rth roundTripperHandler) Do(ctx context.Context, r MetricsQueryRequest) (R } defer func() { _ = response.Body.Close() }() - return rth.codec.DecodeResponse(ctx, response, r, rth.logger) + return rth.codec.DecodeMetricsQueryResponse(ctx, response, r, rth.logger) } // smallestPositiveNonZeroDuration returns the smallest positive and non-zero value diff --git a/pkg/frontend/querymiddleware/limits_test.go b/pkg/frontend/querymiddleware/limits_test.go index c12ccf43760..f4bbda7b344 100644 --- a/pkg/frontend/querymiddleware/limits_test.go +++ b/pkg/frontend/querymiddleware/limits_test.go @@ -762,7 +762,7 @@ func TestLimitedRoundTripper_MaxQueryParallelism(t *testing.T) { }) require.Nil(t, err) - _, err = newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, + _, err = NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { var wg sync.WaitGroup @@ -806,7 +806,7 @@ func TestLimitedRoundTripper_MaxQueryParallelismLateScheduling(t *testing.T) { }) require.Nil(t, err) - _, err = newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, + _, err = NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { // fire up work and we don't wait. @@ -847,7 +847,7 @@ func TestLimitedRoundTripper_OriginalRequestContextCancellation(t *testing.T) { }) require.Nil(t, err) - _, err = newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, + _, err = NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxQueryParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { var wg sync.WaitGroup @@ -906,7 +906,7 @@ func BenchmarkLimitedParallelismRoundTripper(b *testing.B) { for _, concurrentRequestCount := range []int{1, 10, 100} { for _, subRequestCount := range []int{1, 2, 5, 10, 20, 50, 100} { - tripper := newLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxParallelism}, + tripper := NewLimitedParallelismRoundTripper(downstream, codec, mockLimits{maxQueryParallelism: maxParallelism}, MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler { return HandlerFunc(func(c context.Context, _ MetricsQueryRequest) (Response, error) { wg := sync.WaitGroup{} diff --git a/pkg/frontend/querymiddleware/model_extra.go b/pkg/frontend/querymiddleware/model_extra.go index 7f20bce5169..3ca95a6394d 100644 --- a/pkg/frontend/querymiddleware/model_extra.go +++ b/pkg/frontend/querymiddleware/model_extra.go @@ -451,6 +451,10 @@ func (r *PrometheusLabelNamesQueryRequest) GetLabelName() string { return "" } +func (r *PrometheusSeriesQueryRequest) GetLabelName() string { + return "" +} + func (r *PrometheusLabelNamesQueryRequest) GetStartOrDefault() int64 { if r.GetStart() == 0 { return v1.MinTime.UnixMilli() @@ -479,6 +483,95 @@ func (r *PrometheusLabelValuesQueryRequest) GetEndOrDefault() int64 { return r.GetEnd() } +func (r *PrometheusSeriesQueryRequest) GetStartOrDefault() int64 { + if r.GetStart() == 0 { + return v1.MinTime.UnixMilli() + } + return r.GetStart() +} + +func (r *PrometheusSeriesQueryRequest) GetEndOrDefault() int64 { + if r.GetEnd() == 0 { + return v1.MaxTime.UnixMilli() + } + return r.GetEnd() +} + +func (r *PrometheusLabelNamesQueryRequest) GetHeaders() []*PrometheusHeader { + return r.Headers +} + +func (r *PrometheusLabelValuesQueryRequest) GetHeaders() []*PrometheusHeader { + return r.Headers +} + +func (r *PrometheusSeriesQueryRequest) GetHeaders() []*PrometheusHeader { + return r.Headers +} + +// WithLabelName clones the current `PrometheusLabelNamesQueryRequest` with a new label name param. +func (r *PrometheusLabelNamesQueryRequest) WithLabelName(string) (LabelsQueryRequest, error) { + panic("PrometheusLabelNamesQueryRequest.WithLabelName is not implemented") +} + +// WithLabelName clones the current `PrometheusLabelValuesQueryRequest` with a new label name param. +func (r *PrometheusLabelValuesQueryRequest) WithLabelName(name string) (LabelsQueryRequest, error) { + newRequest := *r + newRequest.Path = labelValuesPathSuffix.ReplaceAllString(r.Path, `/api/v1/label/`+name+`/values`) + newRequest.LabelName = name + return &newRequest, nil +} + +// WithLabelName clones the current `PrometheusSeriesQueryRequest` with a new label name param. +func (r *PrometheusSeriesQueryRequest) WithLabelName(string) (LabelsQueryRequest, error) { + panic("PrometheusSeriesQueryRequest.WithLabelName is not implemented") +} + +// WithLabelMatcherSets clones the current `PrometheusLabelNamesQueryRequest` with new label matcher sets. +func (r *PrometheusLabelNamesQueryRequest) WithLabelMatcherSets(labelMatcherSets []string) (LabelsQueryRequest, error) { + newRequest := *r + newRequest.LabelMatcherSets = make([]string, len(labelMatcherSets)) + copy(newRequest.LabelMatcherSets, labelMatcherSets) + return &newRequest, nil +} + +// WithLabelMatcherSets clones the current `PrometheusLabelValuesQueryRequest` with new label matcher sets. +func (r *PrometheusLabelValuesQueryRequest) WithLabelMatcherSets(labelMatcherSets []string) (LabelsQueryRequest, error) { + newRequest := *r + newRequest.LabelMatcherSets = make([]string, len(labelMatcherSets)) + copy(newRequest.LabelMatcherSets, labelMatcherSets) + return &newRequest, nil +} + +// WithLabelMatcherSets clones the current `PrometheusSeriesQueryRequest` with new label matcher sets. +func (r *PrometheusSeriesQueryRequest) WithLabelMatcherSets(labelMatcherSets []string) (LabelsQueryRequest, error) { + newRequest := *r + newRequest.LabelMatcherSets = make([]string, len(labelMatcherSets)) + copy(newRequest.LabelMatcherSets, labelMatcherSets) + return &newRequest, nil +} + +// WithHeaders clones the current `PrometheusLabelNamesQueryRequest` with new headers. +func (r *PrometheusLabelNamesQueryRequest) WithHeaders(headers []*PrometheusHeader) (LabelsQueryRequest, error) { + newRequest := *r + newRequest.Headers = cloneHeaders(headers) + return &newRequest, nil +} + +// WithHeaders clones the current `PrometheusLabelValuesQueryRequest` with new headers. +func (r *PrometheusLabelValuesQueryRequest) WithHeaders(headers []*PrometheusHeader) (LabelsQueryRequest, error) { + newRequest := *r + newRequest.Headers = cloneHeaders(headers) + return &newRequest, nil +} + +// WithHeaders clones the current `PrometheusSeriesQueryRequest` with new headers. +func (r *PrometheusSeriesQueryRequest) WithHeaders(headers []*PrometheusHeader) (LabelsQueryRequest, error) { + newRequest := *r + newRequest.Headers = cloneHeaders(headers) + return &newRequest, nil +} + // AddSpanTags writes query information about the current `PrometheusLabelNamesQueryRequest` // to a span's tag ("attributes" in OpenTelemetry parlance). func (r *PrometheusLabelNamesQueryRequest) AddSpanTags(sp opentracing.Span) { @@ -496,6 +589,14 @@ func (r *PrometheusLabelValuesQueryRequest) AddSpanTags(sp opentracing.Span) { sp.SetTag("end", timestamp.Time(r.GetEnd()).String()) } +// AddSpanTags writes query information about the current `PrometheusSeriesQueryRequest` +// to a span's tag ("attributes" in OpenTelemetry parlance). +func (r *PrometheusSeriesQueryRequest) AddSpanTags(sp opentracing.Span) { + sp.SetTag("matchers", fmt.Sprintf("%v", r.GetLabelMatcherSets())) + sp.SetTag("start", timestamp.Time(r.GetStart()).String()) + sp.SetTag("end", timestamp.Time(r.GetEnd()).String()) +} + type PrometheusLabelNamesQueryRequest struct { Path string Start int64 @@ -508,7 +609,8 @@ type PrometheusLabelNamesQueryRequest struct { // ID of the request used to correlate downstream requests and responses. ID int64 // Limit the number of label names returned. A value of 0 means no limit - Limit uint64 + Limit uint64 + Headers []*PrometheusHeader } func (r *PrometheusLabelNamesQueryRequest) GetPath() string { @@ -548,7 +650,8 @@ type PrometheusLabelValuesQueryRequest struct { // ID of the request used to correlate downstream requests and responses. ID int64 // Limit the number of label values returned. A value of 0 means no limit. - Limit uint64 + Limit uint64 + Headers []*PrometheusHeader } func (r *PrometheusLabelValuesQueryRequest) GetLabelName() string { @@ -576,6 +679,88 @@ func (r *PrometheusLabelValuesQueryRequest) GetLimit() uint64 { return r.Limit } +type PrometheusSeriesQueryRequest struct { + Path string + Start int64 + End int64 + // labelMatcherSets is a repeated field here in order to enable the representation + // of labels queries which have not yet been split; the prometheus querier code + // will eventually split requests like `?match[]=up&match[]=process_start_time_seconds{job="prometheus"}` + // into separate queries, one for each matcher set + LabelMatcherSets []string + // ID of the request used to correlate downstream requests and responses. + ID int64 + // Limit the number of label names returned. A value of 0 means no limit + Limit uint64 + Headers []*PrometheusHeader +} + +func (r *PrometheusSeriesQueryRequest) GetPath() string { + return r.Path +} + +func (r *PrometheusSeriesQueryRequest) GetStart() int64 { + return r.Start +} + +func (r *PrometheusSeriesQueryRequest) GetEnd() int64 { + return r.End +} + +func (r *PrometheusSeriesQueryRequest) GetLabelMatcherSets() []string { + return r.LabelMatcherSets +} + +func (r *PrometheusSeriesQueryRequest) GetID() int64 { + return r.ID +} + +func (r *PrometheusSeriesQueryRequest) GetLimit() uint64 { + return r.Limit +} + +type PrometheusLabelsResponse struct { + Status string `json:"status"` + Data []string `json:"data"` + ErrorType string `json:"errorType,omitempty"` + Error string `json:"error,omitempty"` + Headers []*PrometheusHeader `json:"-"` +} + +func (m *PrometheusLabelsResponse) GetHeaders() []*PrometheusHeader { + if m != nil { + return m.Headers + } + return nil +} + +func (m *PrometheusLabelsResponse) Reset() { *m = PrometheusLabelsResponse{} } +func (*PrometheusLabelsResponse) ProtoMessage() {} +func (m *PrometheusLabelsResponse) String() string { return fmt.Sprintf("%+v", *m) } + +type SeriesData map[string]string + +func (d *SeriesData) String() string { return fmt.Sprintf("%+v", *d) } + +type PrometheusSeriesResponse struct { + Status string `json:"status"` + Data []SeriesData `json:"data"` + ErrorType string `json:"errorType,omitempty"` + Error string `json:"error,omitempty"` + Headers []*PrometheusHeader `json:"-"` +} + +func (m *PrometheusSeriesResponse) GetHeaders() []*PrometheusHeader { + if m != nil { + return m.Headers + } + return nil +} + +func (m *PrometheusSeriesResponse) Reset() { *m = PrometheusSeriesResponse{} } +func (*PrometheusSeriesResponse) ProtoMessage() {} +func (m *PrometheusSeriesResponse) String() string { return fmt.Sprintf("%+v", *m) } + func (d *PrometheusData) UnmarshalJSON(b []byte) error { v := struct { Type model.ValueType `json:"resultType"` diff --git a/pkg/frontend/querymiddleware/model_extra_test.go b/pkg/frontend/querymiddleware/model_extra_test.go index bee9ccde9b6..e45bb7df664 100644 --- a/pkg/frontend/querymiddleware/model_extra_test.go +++ b/pkg/frontend/querymiddleware/model_extra_test.go @@ -118,7 +118,7 @@ func TestMetricQueryRequestCloneHeaders(t *testing.T) { httpReq.Header.Set("Content-Type", "application/x-www-form-urlencoded") httpReq.Header.Set("X-Test-Header", "test-value") - c := NewPrometheusCodec(prometheus.NewPedanticRegistry(), time.Minute*5, "json") + c := NewPrometheusCodec(prometheus.NewPedanticRegistry(), time.Minute*5, "json", nil) originalReq, err := c.DecodeMetricsQueryRequest(context.Background(), httpReq) require.NoError(t, err) diff --git a/pkg/frontend/querymiddleware/prune.go b/pkg/frontend/querymiddleware/prune.go index daf4180bf66..b079aa6edf1 100644 --- a/pkg/frontend/querymiddleware/prune.go +++ b/pkg/frontend/querymiddleware/prune.go @@ -59,7 +59,7 @@ func (p *pruneMiddleware) pruneQuery(ctx context.Context, query string) (string, // Parse the query. expr, err := parser.ParseExpr(query) if err != nil { - return "", false, apierror.New(apierror.TypeBadData, decorateWithParamName(err, "query").Error()) + return "", false, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) } origQueryString := expr.String() diff --git a/pkg/frontend/querymiddleware/querysharding.go b/pkg/frontend/querymiddleware/querysharding.go index 10f838cf8db..dd7d7da55e6 100644 --- a/pkg/frontend/querymiddleware/querysharding.go +++ b/pkg/frontend/querymiddleware/querysharding.go @@ -109,7 +109,7 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response // Parse the query. queryExpr, err := parser.ParseExpr(r.GetQuery()) if err != nil { - return nil, apierror.New(apierror.TypeBadData, decorateWithParamName(err, "query").Error()) + return nil, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) } totalShards := s.getShardsForQuery(ctx, tenantIDs, r, queryExpr, log) @@ -151,10 +151,14 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response return nil, apierror.New(apierror.TypeBadData, err.Error()) } - annotationAccumulator := newAnnotationAccumulator() - shardedQueryable := newShardedQueryable(r, annotationAccumulator, s.next) + annotationAccumulator := NewAnnotationAccumulator() + shardedQueryable := NewShardedQueryable(r, annotationAccumulator, s.next, nil) - qry, err := newQuery(ctx, r, s.engine, lazyquery.NewLazyQueryable(shardedQueryable)) + return ExecuteQueryOnQueryable(ctx, r, s.engine, shardedQueryable, annotationAccumulator) +} + +func ExecuteQueryOnQueryable(ctx context.Context, r MetricsQueryRequest, engine *promql.Engine, queryable storage.Queryable, annotationAccumulator *AnnotationAccumulator) (Response, error) { + qry, err := newQuery(ctx, r, engine, lazyquery.NewLazyQueryable(queryable)) if err != nil { return nil, apierror.New(apierror.TypeBadData, err.Error()) } @@ -169,12 +173,20 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response // query, so we pass in an empty string as the query so the positions will be hidden. warn, info := res.Warnings.AsStrings("", 0, 0) - // Add any annotations returned by the sharded queries, and remove any duplicates. - accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll() - warn = append(warn, accumulatedWarnings...) - info = append(info, accumulatedInfos...) - warn = removeDuplicates(warn) - info = removeDuplicates(info) + if annotationAccumulator != nil { + // Add any annotations returned by the sharded queries, and remove any duplicates. + accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll() + warn = append(warn, accumulatedWarnings...) + info = append(info, accumulatedInfos...) + warn = removeDuplicates(warn) + info = removeDuplicates(info) + } + + var headers []*PrometheusHeader + shardedQueryable, ok := queryable.(*shardedQueryable) + if ok { + headers = shardedQueryable.getResponseHeaders() + } return &PrometheusResponse{ Status: statusSuccess, @@ -182,7 +194,7 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response ResultType: string(res.Value.Type()), Result: extracted, }, - Headers: shardedQueryable.getResponseHeaders(), + Headers: headers, Warnings: warn, Infos: info, }, nil @@ -275,7 +287,7 @@ func (s *querySharding) shardQuery(ctx context.Context, query string, totalShard // each time before passing it to the mapper. expr, err := parser.ParseExpr(query) if err != nil { - return "", nil, apierror.New(apierror.TypeBadData, decorateWithParamName(err, "query").Error()) + return "", nil, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) } shardedQuery, err := mapper.Map(expr) @@ -490,14 +502,14 @@ func longestRegexpMatcherBytes(expr parser.Expr) int { return longest } -// annotationAccumulator collects annotations returned by sharded queries. -type annotationAccumulator struct { +// AnnotationAccumulator collects annotations returned by sharded queries. +type AnnotationAccumulator struct { warnings *sync.Map infos *sync.Map } -func newAnnotationAccumulator() *annotationAccumulator { - return &annotationAccumulator{ +func NewAnnotationAccumulator() *AnnotationAccumulator { + return &AnnotationAccumulator{ warnings: &sync.Map{}, infos: &sync.Map{}, } @@ -506,7 +518,7 @@ func newAnnotationAccumulator() *annotationAccumulator { // addWarning collects the warning annotation w. // // addWarning is safe to call from multiple goroutines. -func (a *annotationAccumulator) addWarning(w string) { +func (a *AnnotationAccumulator) addWarning(w string) { // We use LoadOrStore here to add the annotation if it doesn't already exist or otherwise do nothing. a.warnings.LoadOrStore(w, struct{}{}) } @@ -514,7 +526,7 @@ func (a *annotationAccumulator) addWarning(w string) { // addWarnings collects all of the warning annotations in warnings. // // addWarnings is safe to call from multiple goroutines. -func (a *annotationAccumulator) addWarnings(warnings []string) { +func (a *AnnotationAccumulator) addWarnings(warnings []string) { for _, w := range warnings { a.addWarning(w) } @@ -523,7 +535,7 @@ func (a *annotationAccumulator) addWarnings(warnings []string) { // addInfo collects the info annotation i. // // addInfo is safe to call from multiple goroutines. -func (a *annotationAccumulator) addInfo(i string) { +func (a *AnnotationAccumulator) addInfo(i string) { // We use LoadOrStore here to add the annotation if it doesn't already exist or otherwise do nothing. a.infos.LoadOrStore(i, struct{}{}) } @@ -531,7 +543,7 @@ func (a *annotationAccumulator) addInfo(i string) { // addInfos collects all of the info annotations in infos. // // addInfo is safe to call from multiple goroutines. -func (a *annotationAccumulator) addInfos(infos []string) { +func (a *AnnotationAccumulator) addInfos(infos []string) { for _, i := range infos { a.addInfo(i) } @@ -540,7 +552,7 @@ func (a *annotationAccumulator) addInfos(infos []string) { // getAll returns all annotations collected by this accumulator. // // getAll may return inconsistent or unexpected results if it is called concurrently with addInfo or addWarning. -func (a *annotationAccumulator) getAll() (warnings, infos []string) { +func (a *AnnotationAccumulator) getAll() (warnings, infos []string) { return getAllKeys(a.warnings), getAllKeys(a.infos) } diff --git a/pkg/frontend/querymiddleware/querysharding_test.go b/pkg/frontend/querymiddleware/querysharding_test.go index e97ff7b0575..ef6b7cda60c 100644 --- a/pkg/frontend/querymiddleware/querysharding_test.go +++ b/pkg/frontend/querymiddleware/querysharding_test.go @@ -75,9 +75,9 @@ func approximatelyEqualsSamples(t *testing.T, a, b *PrometheusResponse) { require.Equal(t, statusSuccess, a.Status) require.Equal(t, statusSuccess, b.Status) - as, err := responseToSamples(a) + as, err := ResponseToSamples(a) require.Nil(t, err) - bs, err := responseToSamples(b) + bs, err := ResponseToSamples(b) require.Nil(t, err) require.Equalf(t, len(as), len(bs), "expected same number of series: one contains %v, other %v", sampleStreamsStrings(as), sampleStreamsStrings(bs)) @@ -923,7 +923,7 @@ func TestQueryshardingDeterminism(t *testing.T) { shardedPrometheusRes := shardedRes.(*PrometheusResponse) - sampleStreams, err := responseToSamples(shardedPrometheusRes) + sampleStreams, err := ResponseToSamples(shardedPrometheusRes) require.NoError(t, err) require.Lenf(t, sampleStreams, 1, "There should be 1 samples stream (query %d)", i) diff --git a/pkg/frontend/querymiddleware/remote_read.go b/pkg/frontend/querymiddleware/remote_read.go index 463b20c052d..11070ce18d5 100644 --- a/pkg/frontend/querymiddleware/remote_read.go +++ b/pkg/frontend/querymiddleware/remote_read.go @@ -37,7 +37,7 @@ type remoteReadRoundTripper struct { middleware MetricsQueryMiddleware } -func newRemoteReadRoundTripper(next http.RoundTripper, middlewares ...MetricsQueryMiddleware) http.RoundTripper { +func NewRemoteReadRoundTripper(next http.RoundTripper, middlewares ...MetricsQueryMiddleware) http.RoundTripper { return &remoteReadRoundTripper{ next: next, middleware: MergeMetricsQueryMiddlewares(middlewares...), diff --git a/pkg/frontend/querymiddleware/remote_read_test.go b/pkg/frontend/querymiddleware/remote_read_test.go index ec87b3b020b..658f38cd0c5 100644 --- a/pkg/frontend/querymiddleware/remote_read_test.go +++ b/pkg/frontend/querymiddleware/remote_read_test.go @@ -135,7 +135,7 @@ func TestRemoteReadRoundTripperCallsDownstreamOnAll(t *testing.T) { actualMiddleWareCalls++ return tc.handler }) - rr := newRemoteReadRoundTripper(roundTripper, middleware) + rr := NewRemoteReadRoundTripper(roundTripper, middleware) _, err := rr.RoundTrip(makeTestHTTPRequestFromRemoteRead(makeTestRemoteReadRequest())) if tc.expectError != "" { require.Error(t, err) @@ -195,7 +195,7 @@ func TestRemoteReadRoundTripper_ShouldAllowMiddlewaresToManipulateRequest(t *tes }, } - rr := newRemoteReadRoundTripper(downstream, middleware) + rr := NewRemoteReadRoundTripper(downstream, middleware) _, err := rr.RoundTrip(makeTestHTTPRequestFromRemoteRead(origRemoteReadReq)) require.NoError(t, err) require.NotNil(t, downstreamReq) @@ -255,7 +255,7 @@ func TestRemoteReadRoundTripper_ShouldAllowMiddlewaresToReturnEmptyResponse(t *t }, } - rr := newRemoteReadRoundTripper(downstream, middleware) + rr := NewRemoteReadRoundTripper(downstream, middleware) origRemoteReadReq := makeTestRemoteReadRequest() _, err := rr.RoundTrip(makeTestHTTPRequestFromRemoteRead(origRemoteReadReq)) diff --git a/pkg/frontend/querymiddleware/results_cache_test.go b/pkg/frontend/querymiddleware/results_cache_test.go index cccd7233888..0258062766f 100644 --- a/pkg/frontend/querymiddleware/results_cache_test.go +++ b/pkg/frontend/querymiddleware/results_cache_test.go @@ -566,7 +566,7 @@ func TestPartitionCacheExtents(t *testing.T) { func TestDefaultSplitter_QueryRequest(t *testing.T) { t.Parallel() reg := prometheus.NewPedanticRegistry() - codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON) + codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil) ctx := context.Background() diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index ee465164244..2efc03a9a09 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -38,6 +38,7 @@ const ( cardinalityActiveNativeHistogramMetricsPathSuffix = "/api/v1/cardinality/active_native_histogram_metrics" labelNamesPathSuffix = "/api/v1/labels" remoteReadPathSuffix = "/api/v1/read" + seriesPathSuffix = "/api/v1/series" queryTypeInstant = "query" queryTypeRange = "query_range" @@ -131,11 +132,24 @@ func (q HandlerFunc) Do(ctx context.Context, req MetricsQueryRequest) (Response, return q(ctx, req) } -// MetricsQueryHandler is like http.Handle, but specifically for Prometheus query and query_range calls. +// MetricsQueryHandler is like http.Handler, but specifically for Prometheus query and query_range calls. type MetricsQueryHandler interface { Do(context.Context, MetricsQueryRequest) (Response, error) } +// LabelsHandlerFunc is like http.HandlerFunc, but for LabelsQueryHandler. +type LabelsHandlerFunc func(context.Context, LabelsQueryRequest) (Response, error) + +// Do implements LabelsQueryHandler. +func (q LabelsHandlerFunc) Do(ctx context.Context, req LabelsQueryRequest) (Response, error) { + return q(ctx, req) +} + +// LabelsQueryHandler is like http.Handler, but specifically for Prometheus label names and values calls. +type LabelsQueryHandler interface { + Do(context.Context, LabelsQueryRequest) (Response, error) +} + // MetricsQueryMiddlewareFunc is like http.HandlerFunc, but for MetricsQueryMiddleware. type MetricsQueryMiddlewareFunc func(MetricsQueryHandler) MetricsQueryHandler @@ -245,14 +259,14 @@ func newQueryTripperware( // It means that the first roundtrippers defined in this function will be the last to be // executed. - queryrange := newLimitedParallelismRoundTripper(next, codec, limits, queryRangeMiddleware...) - instant := newLimitedParallelismRoundTripper(next, codec, limits, queryInstantMiddleware...) - remoteRead := newRemoteReadRoundTripper(next, remoteReadMiddleware...) + queryrange := NewLimitedParallelismRoundTripper(next, codec, limits, queryRangeMiddleware...) + instant := NewLimitedParallelismRoundTripper(next, codec, limits, queryInstantMiddleware...) + remoteRead := NewRemoteReadRoundTripper(next, remoteReadMiddleware...) // Wrap next for cardinality, labels queries and all other queries. // That attempts to parse "start" and "end" from the HTTP request and set them in the request's QueryDetails. // range and instant queries have more accurate logic for query details. - next = newQueryDetailsStartEndRoundTripper(next) + next = NewQueryDetailsStartEndRoundTripper(next) cardinality := next activeSeries := next activeNativeHistogramMetrics := next @@ -442,8 +456,8 @@ func newQueryMiddlewares( return } -// newQueryDetailsStartEndRoundTripper parses "start" and "end" parameters from the query and sets same fields in the QueryDetails in the context. -func newQueryDetailsStartEndRoundTripper(next http.RoundTripper) http.RoundTripper { +// NewQueryDetailsStartEndRoundTripper parses "start" and "end" parameters from the query and sets same fields in the QueryDetails in the context. +func NewQueryDetailsStartEndRoundTripper(next http.RoundTripper) http.RoundTripper { return RoundTripFunc(func(req *http.Request) (*http.Response, error) { params, _ := util.ParseRequestFormWithoutConsumingBody(req) if details := QueryDetailsFromContext(req.Context()); details != nil { @@ -533,6 +547,10 @@ func IsLabelsQuery(path string) bool { return IsLabelNamesQuery(path) || IsLabelValuesQuery(path) } +func IsSeriesQuery(path string) bool { + return strings.HasSuffix(path, seriesPathSuffix) +} + func IsActiveSeriesQuery(path string) bool { return strings.HasSuffix(path, cardinalityActiveSeriesPathSuffix) } diff --git a/pkg/frontend/querymiddleware/roundtrip_test.go b/pkg/frontend/querymiddleware/roundtrip_test.go index 679c67b01fd..715661ba174 100644 --- a/pkg/frontend/querymiddleware/roundtrip_test.go +++ b/pkg/frontend/querymiddleware/roundtrip_test.go @@ -154,7 +154,7 @@ func TestTripperware_InstantQuery(t *testing.T) { return nil, err } - return codec.EncodeResponse(r.Context(), r, &PrometheusResponse{ + return codec.EncodeMetricsQueryResponse(r.Context(), r, &PrometheusResponse{ Status: "success", Data: &PrometheusData{ ResultType: "vector", @@ -899,7 +899,7 @@ func TestTripperware_ShouldSupportReadConsistencyOffsetsInjection(t *testing.T) }), log.NewNopLogger(), mockLimits{}, - NewPrometheusCodec(nil, 0, formatJSON), + NewPrometheusCodec(nil, 0, formatJSON, nil), nil, promql.EngineOpts{ Logger: log.NewNopLogger(), diff --git a/pkg/frontend/querymiddleware/sharded_queryable.go b/pkg/frontend/querymiddleware/sharded_queryable.go index 7f65a2e3e91..b164c73bd0d 100644 --- a/pkg/frontend/querymiddleware/sharded_queryable.go +++ b/pkg/frontend/querymiddleware/sharded_queryable.go @@ -32,29 +32,36 @@ var ( errNotImplemented = errors.New("not implemented") ) +type HandleEmbeddedQueryFunc func(ctx context.Context, queryString string, query MetricsQueryRequest, handler MetricsQueryHandler) ([]SampleStream, *PrometheusResponse, error) + // shardedQueryable is an implementor of the Queryable interface. type shardedQueryable struct { req MetricsQueryRequest - annotationAccumulator *annotationAccumulator + annotationAccumulator *AnnotationAccumulator handler MetricsQueryHandler responseHeaders *responseHeadersTracker + handleEmbeddedQuery HandleEmbeddedQueryFunc } -// newShardedQueryable makes a new shardedQueryable. We expect a new queryable is created for each +// NewShardedQueryable makes a new shardedQueryable. We expect a new queryable is created for each // query, otherwise the response headers tracker doesn't work as expected, because it merges the // headers for all queries run through the queryable and never reset them. -func newShardedQueryable(req MetricsQueryRequest, annotationAccumulator *annotationAccumulator, next MetricsQueryHandler) *shardedQueryable { +func NewShardedQueryable(req MetricsQueryRequest, annotationAccumulator *AnnotationAccumulator, next MetricsQueryHandler, handleEmbeddedQuery HandleEmbeddedQueryFunc) *shardedQueryable { //nolint:revive + if handleEmbeddedQuery == nil { + handleEmbeddedQuery = defaultHandleEmbeddedQueryFunc + } return &shardedQueryable{ req: req, annotationAccumulator: annotationAccumulator, handler: next, responseHeaders: newResponseHeadersTracker(), + handleEmbeddedQuery: handleEmbeddedQuery, } } // Querier implements storage.Queryable. func (q *shardedQueryable) Querier(_, _ int64) (storage.Querier, error) { - return &shardedQuerier{req: q.req, annotationAccumulator: q.annotationAccumulator, handler: q.handler, responseHeaders: q.responseHeaders}, nil + return &shardedQuerier{req: q.req, annotationAccumulator: q.annotationAccumulator, handler: q.handler, responseHeaders: q.responseHeaders, handleEmbeddedQuery: q.handleEmbeddedQuery}, nil } // getResponseHeaders returns the merged response headers received by the downstream @@ -68,11 +75,13 @@ func (q *shardedQueryable) getResponseHeaders() []*PrometheusHeader { // through the downstream handler. type shardedQuerier struct { req MetricsQueryRequest - annotationAccumulator *annotationAccumulator + annotationAccumulator *AnnotationAccumulator handler MetricsQueryHandler // Keep track of response headers received when running embedded queries. responseHeaders *responseHeadersTracker + + handleEmbeddedQuery HandleEmbeddedQueryFunc } // Select implements storage.Querier. @@ -106,6 +115,29 @@ func (q *shardedQuerier) Select(ctx context.Context, _ bool, hints *storage.Sele return q.handleEmbeddedQueries(ctx, queries, hints) } +func defaultHandleEmbeddedQueryFunc(ctx context.Context, queryString string, query MetricsQueryRequest, handler MetricsQueryHandler) ([]SampleStream, *PrometheusResponse, error) { + query, err := query.WithQuery(queryString) + if err != nil { + return nil, nil, err + } + + resp, err := handler.Do(ctx, query) + if err != nil { + return nil, nil, err + } + + promRes, ok := resp.(*PrometheusResponse) + if !ok { + return nil, nil, errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{}) + } + resStreams, err := ResponseToSamples(promRes) + if err != nil { + return nil, nil, err + } + + return resStreams, promRes, nil +} + // handleEmbeddedQueries concurrently executes the provided queries through the downstream handler. // The returned storage.SeriesSet contains sorted series. func (q *shardedQuerier) handleEmbeddedQueries(ctx context.Context, queries []string, hints *storage.SelectHints) storage.SeriesSet { @@ -113,26 +145,14 @@ func (q *shardedQuerier) handleEmbeddedQueries(ctx context.Context, queries []st // Concurrently run each query. It breaks and cancels each worker context on first error. err := concurrency.ForEachJob(ctx, len(queries), len(queries), func(ctx context.Context, idx int) error { - query, err := q.req.WithQuery(queries[idx]) - if err != nil { - return err - } - resp, err := q.handler.Do(ctx, query) + resStreams, promRes, err := q.handleEmbeddedQuery(ctx, queries[idx], q.req, q.handler) if err != nil { return err } - promRes, ok := resp.(*PrometheusResponse) - if !ok { - return errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{}) - } - resStreams, err := responseToSamples(promRes) - if err != nil { - return err - } streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables. - q.responseHeaders.mergeHeaders(resp.(*PrometheusResponse).Headers) + q.responseHeaders.mergeHeaders(promRes.Headers) q.annotationAccumulator.addInfos(promRes.Infos) q.annotationAccumulator.addWarnings(promRes.Warnings) @@ -298,8 +318,8 @@ func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *sto return series.NewConcreteSeriesSetFromUnsortedSeries(set) } -// responseToSamples is needed to map back from api response to the underlying series data -func responseToSamples(resp *PrometheusResponse) ([]SampleStream, error) { +// ResponseToSamples is needed to map back from api response to the underlying series data +func ResponseToSamples(resp *PrometheusResponse) ([]SampleStream, error) { if resp.Error != "" { return nil, errors.New(resp.Error) } diff --git a/pkg/frontend/querymiddleware/sharded_queryable_test.go b/pkg/frontend/querymiddleware/sharded_queryable_test.go index 273b3db6cc0..9e94f44f98f 100644 --- a/pkg/frontend/querymiddleware/sharded_queryable_test.go +++ b/pkg/frontend/querymiddleware/sharded_queryable_test.go @@ -257,7 +257,7 @@ func TestShardedQuerier_Select_ShouldConcurrentlyRunEmbeddedQueries(t *testing.T } func TestShardedQueryable_GetResponseHeaders(t *testing.T) { - queryable := newShardedQueryable(&PrometheusRangeQueryRequest{}, nil, nil) + queryable := NewShardedQueryable(&PrometheusRangeQueryRequest{}, nil, nil, nil) assert.Empty(t, queryable.getResponseHeaders()) // Merge some response headers from the 1st querier. @@ -288,7 +288,7 @@ func TestShardedQueryable_GetResponseHeaders(t *testing.T) { } func mkShardedQuerier(handler MetricsQueryHandler) *shardedQuerier { - return &shardedQuerier{req: &PrometheusRangeQueryRequest{}, handler: handler, responseHeaders: newResponseHeadersTracker()} + return &shardedQuerier{req: &PrometheusRangeQueryRequest{}, handler: handler, responseHeaders: newResponseHeadersTracker(), handleEmbeddedQuery: defaultHandleEmbeddedQueryFunc} } func TestNewSeriesSetFromEmbeddedQueriesResults(t *testing.T) { @@ -418,7 +418,7 @@ func TestResponseToSamples(t *testing.T) { }, } - streams, err := responseToSamples(input) + streams, err := ResponseToSamples(input) require.NoError(t, err) assertEqualSampleStream(t, input.Data.Result, streams) } diff --git a/pkg/frontend/querymiddleware/split_and_cache.go b/pkg/frontend/querymiddleware/split_and_cache.go index 83393f7113f..5a02d55a8ec 100644 --- a/pkg/frontend/querymiddleware/split_and_cache.go +++ b/pkg/frontend/querymiddleware/split_and_cache.go @@ -678,7 +678,7 @@ func splitQueryByInterval(req MetricsQueryRequest, interval time.Duration) ([]Me func evaluateAtModifierFunction(query string, start, end int64) (string, error) { expr, err := parser.ParseExpr(query) if err != nil { - return "", apierror.New(apierror.TypeBadData, decorateWithParamName(err, "query").Error()) + return "", apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) } parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { switch exprAt := n.(type) { diff --git a/pkg/frontend/querymiddleware/split_and_cache_test.go b/pkg/frontend/querymiddleware/split_and_cache_test.go index d861df69e97..f1676c4da1c 100644 --- a/pkg/frontend/querymiddleware/split_and_cache_test.go +++ b/pkg/frontend/querymiddleware/split_and_cache_test.go @@ -1699,7 +1699,7 @@ func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { return nil, err } - return q.codec.EncodeResponse(r.Context(), r, response) + return q.codec.EncodeMetricsQueryResponse(r.Context(), r, response) } const seconds = 1e3 // 1e3 milliseconds per second. diff --git a/pkg/frontend/querymiddleware/split_by_instant_interval.go b/pkg/frontend/querymiddleware/split_by_instant_interval.go index 0d3687362d3..59f930f20bc 100644 --- a/pkg/frontend/querymiddleware/split_by_instant_interval.go +++ b/pkg/frontend/querymiddleware/split_by_instant_interval.go @@ -130,7 +130,7 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr if err != nil { level.Warn(spanLog).Log("msg", "failed to parse query", "err", err) s.metrics.splittingSkipped.WithLabelValues(skippedReasonParsingFailed).Inc() - return nil, apierror.New(apierror.TypeBadData, decorateWithParamName(err, "query").Error()) + return nil, apierror.New(apierror.TypeBadData, DecorateWithParamName(err, "query").Error()) } instantSplitQuery, err := mapper.Map(expr) @@ -180,8 +180,8 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr return nil, err } - annotationAccumulator := newAnnotationAccumulator() - shardedQueryable := newShardedQueryable(req, annotationAccumulator, s.next) + annotationAccumulator := NewAnnotationAccumulator() + shardedQueryable := NewShardedQueryable(req, annotationAccumulator, s.next, nil) qry, err := newQuery(ctx, req, s.engine, lazyquery.NewLazyQueryable(shardedQueryable)) if err != nil { diff --git a/pkg/frontend/v2/frontend_scheduler_adapter_test.go b/pkg/frontend/v2/frontend_scheduler_adapter_test.go index 4b545265863..7504019debb 100644 --- a/pkg/frontend/v2/frontend_scheduler_adapter_test.go +++ b/pkg/frontend/v2/frontend_scheduler_adapter_test.go @@ -63,7 +63,7 @@ func TestExtractAdditionalQueueDimensions(t *testing.T) { adapter := &frontendToSchedulerAdapter{ cfg: Config{QueryStoreAfter: 12 * time.Hour}, limits: limits{queryIngestersWithin: 13 * time.Hour}, - codec: querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json"), + codec: querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil), } now := time.Now() @@ -196,7 +196,7 @@ func TestQueryDecoding(t *testing.T) { adapter := &frontendToSchedulerAdapter{ cfg: Config{QueryStoreAfter: 12 * time.Hour}, limits: limits{queryIngestersWithin: 13 * time.Hour}, - codec: querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json"), + codec: querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil), } now := time.Now() diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index 83fe9f39c49..90af11b65b3 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -76,7 +76,7 @@ func setupFrontendWithConcurrencyAndServerOptions(t *testing.T, reg prometheus.R cfg.Port = grpcPort logger := log.NewLogfmtLogger(os.Stdout) - codec := querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json") + codec := querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil) f, err := NewFrontend(cfg, limits{}, logger, reg, codec) require.NoError(t, err) diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index c372bea1c25..7d9050259d0 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -696,7 +696,7 @@ func (t *Mimir) initFlusher() (serv services.Service, err error) { // initQueryFrontendCodec initializes query frontend codec. // NOTE: Grafana Enterprise Metrics depends on this. func (t *Mimir) initQueryFrontendCodec() (services.Service, error) { - t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Frontend.FrontendV2.LookBackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat) + t.QueryFrontendCodec = querymiddleware.NewPrometheusCodec(t.Registerer, t.Cfg.Frontend.FrontendV2.LookBackDelta, t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, nil) return nil, nil } diff --git a/pkg/querier/tenantfederation/merge_exemplar_queryable.go b/pkg/querier/tenantfederation/merge_exemplar_queryable.go index 51c441c0d35..4288752f05f 100644 --- a/pkg/querier/tenantfederation/merge_exemplar_queryable.go +++ b/pkg/querier/tenantfederation/merge_exemplar_queryable.go @@ -225,7 +225,7 @@ func filterTenantsAndRewriteMatchers(idLabelName string, ids []string, allMatche // In order to support that, we start with a set of 0 tenant IDs and add any tenant IDs that remain // after filtering (based on the inner slice of matchers), for each outer slice. for i, matchers := range allMatchers { - filteredIDs, unrelatedMatchers := filterValuesByMatchers(idLabelName, ids, matchers...) + filteredIDs, unrelatedMatchers := FilterValuesByMatchers(idLabelName, ids, matchers...) for k := range filteredIDs { outIDs[k] = struct{}{} } diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index e3b350280a6..c01da5e2871 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -198,7 +198,7 @@ func (m *mergeQuerier) LabelValues(ctx context.Context, name string, hints *stor spanlog, ctx := spanlogger.NewWithLogger(ctx, m.logger, "mergeQuerier.LabelValues") defer spanlog.Finish() - matchedIDs, filteredMatchers := filterValuesByMatchers(m.idLabelName, ids, matchers...) + matchedIDs, filteredMatchers := FilterValuesByMatchers(m.idLabelName, ids, matchers...) if name == m.idLabelName { labelValues := make([]string, 0, len(matchedIDs)) @@ -237,7 +237,7 @@ func (m *mergeQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints spanlog, ctx := spanlogger.NewWithLogger(ctx, m.logger, "mergeQuerier.LabelNames") defer spanlog.Finish() - matchedIDs, filteredMatchers := filterValuesByMatchers(m.idLabelName, ids, matchers...) + matchedIDs, filteredMatchers := FilterValuesByMatchers(m.idLabelName, ids, matchers...) labelNames, warnings, err := m.mergeDistinctStringSliceWithTenants(ctx, matchedIDs, func(ctx context.Context, id string) ([]string, annotations.Annotations, error) { return m.upstream.LabelNames(ctx, id, hints, filteredMatchers...) @@ -349,7 +349,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora spanlog, ctx := spanlogger.NewWithLogger(ctx, m.logger, "mergeQuerier.Select") defer spanlog.Finish() - matchedIDs, filteredMatchers := filterValuesByMatchers(m.idLabelName, ids, matchers...) + matchedIDs, filteredMatchers := FilterValuesByMatchers(m.idLabelName, ids, matchers...) jobs := make([]string, 0, len(matchedIDs)) seriesSets := make([]storage.SeriesSet, len(matchedIDs)) diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index 85abdc7b85b..50826eef68d 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -31,7 +31,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxTenants, "tenant-federation.max-tenants", defaultMaxTenants, "The max number of tenant IDs that may be supplied for a federated query if enabled. 0 to disable the limit.") } -// filterValuesByMatchers applies matchers to inputed `idLabelName` and +// FilterValuesByMatchers applies matchers to inputed `idLabelName` and // `ids`. A set of matched IDs is returned and also all label matchers not // targeting the `idLabelName` label. // @@ -40,7 +40,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // to as part of Select in the mergeQueryable, to ensure only relevant queries // are considered and the forwarded matchers do not contain matchers on the // `idLabelName`. -func filterValuesByMatchers(idLabelName string, ids []string, matchers ...*labels.Matcher) (matchedIDs map[string]struct{}, unrelatedMatchers []*labels.Matcher) { +func FilterValuesByMatchers(idLabelName string, ids []string, matchers ...*labels.Matcher) (matchedIDs map[string]struct{}, unrelatedMatchers []*labels.Matcher) { // this contains the matchers which are not related to idLabelName unrelatedMatchers = make([]*labels.Matcher, 0, len(matchers))