Skip to content

Commit

Permalink
Generalise some of Mimir's query sharding code to be more reusable (#…
Browse files Browse the repository at this point in the history
…9363)

* Generalise some of Mimir's query sharding code to be more reusable

* Update according to code review and add unit tests

* Revise according to code review
  • Loading branch information
zenador authored Sep 30, 2024
1 parent a9f9dc5 commit 97f0319
Show file tree
Hide file tree
Showing 32 changed files with 942 additions and 153 deletions.
2 changes: 1 addition & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
if l != nil {
logger = l
}
codec := querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json")
codec := querymiddleware.NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil)

var workerConfig querier_worker.Config
flagext.DefaultValues(&workerConfig)
Expand Down
258 changes: 232 additions & 26 deletions pkg/frontend/querymiddleware/codec.go

Large diffs are not rendered by default.

36 changes: 33 additions & 3 deletions pkg/frontend/querymiddleware/codec_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@

package querymiddleware

import v1 "github.com/prometheus/prometheus/web/api/v1"
import (
v1 "github.com/prometheus/prometheus/web/api/v1"
)

const jsonMimeType = "application/json"

type jsonFormatter struct{}

func (j jsonFormatter) EncodeResponse(resp *PrometheusResponse) ([]byte, error) {
func (j jsonFormatter) EncodeQueryResponse(resp *PrometheusResponse) ([]byte, error) {
return json.Marshal(resp)
}

func (j jsonFormatter) DecodeResponse(buf []byte) (*PrometheusResponse, error) {
func (j jsonFormatter) DecodeQueryResponse(buf []byte) (*PrometheusResponse, error) {
var resp PrometheusResponse

if err := json.Unmarshal(buf, &resp); err != nil {
Expand All @@ -25,6 +27,34 @@ func (j jsonFormatter) DecodeResponse(buf []byte) (*PrometheusResponse, error) {
return &resp, nil
}

func (j jsonFormatter) EncodeLabelsResponse(resp *PrometheusLabelsResponse) ([]byte, error) {
return json.Marshal(resp)
}

func (j jsonFormatter) DecodeLabelsResponse(buf []byte) (*PrometheusLabelsResponse, error) {
var resp PrometheusLabelsResponse

if err := json.Unmarshal(buf, &resp); err != nil {
return nil, err
}

return &resp, nil
}

func (j jsonFormatter) EncodeSeriesResponse(resp *PrometheusSeriesResponse) ([]byte, error) {
return json.Marshal(resp)
}

func (j jsonFormatter) DecodeSeriesResponse(buf []byte) (*PrometheusSeriesResponse, error) {
var resp PrometheusSeriesResponse

if err := json.Unmarshal(buf, &resp); err != nil {
return nil, err
}

return &resp, nil
}

func (j jsonFormatter) Name() string {
return formatJSON
}
Expand Down
199 changes: 192 additions & 7 deletions pkg/frontend/querymiddleware/codec_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/grafana/mimir/pkg/mimirpb"
)

func TestPrometheusCodec_JSONResponse(t *testing.T) {
func TestPrometheusCodec_JSONResponse_Metrics(t *testing.T) {
headers := http.Header{"Content-Type": []string{"application/json"}}
expectedRespHeaders := []*PrometheusHeader{
{
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON)
codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil)

body, err := json.Marshal(tc.resp)
require.NoError(t, err)
Expand All @@ -175,7 +175,7 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
decoded, err := codec.DecodeResponse(context.Background(), httpResponse, nil, log.NewNopLogger())
decoded, err := codec.DecodeMetricsQueryResponse(context.Background(), httpResponse, nil, log.NewNopLogger())
if err != nil || tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
return
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
encoded, err := codec.EncodeResponse(context.Background(), httpRequest, decoded)
encoded, err := codec.EncodeMetricsQueryResponse(context.Background(), httpRequest, decoded)
require.NoError(t, err)

expectedJSON, err := readResponseBody(httpResponse)
Expand All @@ -231,7 +231,118 @@ func TestPrometheusCodec_JSONResponse(t *testing.T) {
}
}

func TestPrometheusCodec_JSONEncoding(t *testing.T) {
func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) {
headers := http.Header{"Content-Type": []string{"application/json"}}
expectedRespHeaders := []*PrometheusHeader{
{
Name: "Content-Type",
Values: []string{"application/json"},
},
}

for _, tc := range []struct {
name string
request LabelsQueryRequest
isSeriesResponse bool
responseHeaders http.Header
resp prometheusAPIResponse
expected Response
expectedErr error
}{
{
name: "successful labels response",
request: &PrometheusLabelNamesQueryRequest{},
isSeriesResponse: false,
resp: prometheusAPIResponse{
Status: statusSuccess,
Data: []string{"foo", "bar"},
},
expected: &PrometheusLabelsResponse{
Status: statusSuccess,
Data: []string{"foo", "bar"},
Headers: expectedRespHeaders,
},
},
{
name: "successful series response",
request: &PrometheusSeriesQueryRequest{},
isSeriesResponse: true,
resp: prometheusAPIResponse{
Status: statusSuccess,
Data: []SeriesData{
{
"__name__": "series_1",
"foo": "bar",
},
{
"__name__": "hist_series_1",
"hoo": "hbar",
},
},
},
expected: &PrometheusSeriesResponse{
Status: statusSuccess,
Data: []SeriesData{
{
"__name__": "series_1",
"foo": "bar",
},
{
"__name__": "hist_series_1",
"hoo": "hbar",
},
},
Headers: expectedRespHeaders,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil)

body, err := json.Marshal(tc.resp)
require.NoError(t, err)
httpResponse := &http.Response{
StatusCode: 200,
Header: headers,
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
decoded, err := codec.DecodeLabelsQueryResponse(context.Background(), httpResponse, tc.request, log.NewNopLogger())
if err != nil || tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
return
}

require.NoError(t, err)
require.Equal(t, tc.expected, decoded)

httpRequest := &http.Request{
Header: http.Header{"Accept": []string{jsonMimeType}},
}

// Reset response, as the above call will have consumed the body reader.
httpResponse = &http.Response{
StatusCode: 200,
Header: headers,
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, decoded, tc.isSeriesResponse)
require.NoError(t, err)

expectedJSON, err := readResponseBody(httpResponse)
require.NoError(t, err)
encodedJSON, err := readResponseBody(encoded)
require.NoError(t, err)

require.JSONEq(t, string(expectedJSON), string(encodedJSON))
require.Equal(t, httpResponse, encoded)
})
}
}

func TestPrometheusCodec_JSONEncoding_Metrics(t *testing.T) {
responseHistogram := mimirpb.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Schema: 3,
Expand Down Expand Up @@ -353,12 +464,12 @@ func TestPrometheusCodec_JSONEncoding(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON)
codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil)
httpRequest := &http.Request{
Header: http.Header{"Accept": []string{jsonMimeType}},
}

encoded, err := codec.EncodeResponse(context.Background(), httpRequest, tc.response)
encoded, err := codec.EncodeMetricsQueryResponse(context.Background(), httpRequest, tc.response)
require.NoError(t, err)
require.Equal(t, http.StatusOK, encoded.StatusCode)
require.Equal(t, "application/json", encoded.Header.Get("Content-Type"))
Expand All @@ -381,3 +492,77 @@ func TestPrometheusCodec_JSONEncoding(t *testing.T) {
})
}
}

func TestPrometheusCodec_JSONEncoding_Labels(t *testing.T) {
for _, tc := range []struct {
name string
expectedJSON string
response Response
isSeriesResponse bool
}{
{
name: "successful labels response",
response: &PrometheusLabelsResponse{
Status: statusSuccess,
Data: []string{
"foo",
"bar",
},
},
expectedJSON: `
{
"status": "success",
"data": ["foo", "bar"]
}
`,
isSeriesResponse: false,
},
{
name: "successful series response",
response: &PrometheusSeriesResponse{
Status: statusSuccess,
Data: []SeriesData{
{
"__name__": "series_1",
"foo": "bar",
},
{
"__name__": "hist_series_1",
"hoo": "hbar",
},
},
},
expectedJSON: `
{
"status": "success",
"data": [{
"__name__": "series_1",
"foo": "bar"
}, {
"__name__": "hist_series_1",
"hoo": "hbar"
}]
}
`,
isSeriesResponse: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
codec := NewPrometheusCodec(reg, 0*time.Minute, formatJSON, nil)
httpRequest := &http.Request{
Header: http.Header{"Accept": []string{jsonMimeType}},
}

encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, tc.response, tc.isSeriesResponse)
require.NoError(t, err)
require.Equal(t, http.StatusOK, encoded.StatusCode)
require.Equal(t, "application/json", encoded.Header.Get("Content-Type"))

encodedJSON, err := readResponseBody(encoded)
require.NoError(t, err)
require.JSONEq(t, tc.expectedJSON, string(encodedJSON))
require.Equal(t, len(encodedJSON), int(encoded.ContentLength))
})
}
}
20 changes: 18 additions & 2 deletions pkg/frontend/querymiddleware/codec_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (f protobufFormatter) ContentType() v1.MIMEType {
return v1.MIMEType{Type: mimirpb.QueryResponseMimeTypeType, SubType: mimirpb.QueryResponseMimeTypeSubType}
}

func (f protobufFormatter) EncodeResponse(resp *PrometheusResponse) ([]byte, error) {
func (f protobufFormatter) EncodeQueryResponse(resp *PrometheusResponse) ([]byte, error) {
status, err := mimirpb.StatusFromPrometheusString(resp.Status)
if err != nil {
return nil, err
Expand Down Expand Up @@ -186,7 +186,7 @@ func (protobufFormatter) encodeMatrixData(data []SampleStream) mimirpb.MatrixDat
return mimirpb.MatrixData{Series: series}
}

func (f protobufFormatter) DecodeResponse(buf []byte) (*PrometheusResponse, error) {
func (f protobufFormatter) DecodeQueryResponse(buf []byte) (*PrometheusResponse, error) {
var resp mimirpb.QueryResponse

if err := resp.Unmarshal(buf); err != nil {
Expand Down Expand Up @@ -326,6 +326,22 @@ func (f protobufFormatter) decodeMatrixData(data *mimirpb.MatrixData) (*Promethe
}, nil
}

func (f protobufFormatter) EncodeLabelsResponse(*PrometheusLabelsResponse) ([]byte, error) {
return nil, errors.New("protobuf labels encoding is not supported")
}

func (f protobufFormatter) DecodeLabelsResponse([]byte) (*PrometheusLabelsResponse, error) {
return nil, errors.New("protobuf labels decoding is not supported")
}

func (f protobufFormatter) EncodeSeriesResponse(*PrometheusSeriesResponse) ([]byte, error) {
return nil, errors.New("protobuf series encoding is not supported")
}

func (f protobufFormatter) DecodeSeriesResponse([]byte) (*PrometheusSeriesResponse, error) {
return nil, errors.New("protobuf series decoding is not supported")
}

func labelsFromStringArray(s []string) ([]mimirpb.LabelAdapter, error) {
if len(s)%2 != 0 {
return nil, fmt.Errorf("metric is malformed: expected even number of symbols, but got %v", len(s))
Expand Down
Loading

0 comments on commit 97f0319

Please sign in to comment.