diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index afd0a9d294..fd97747331 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -51,12 +51,10 @@ func registerQueryFrontend(app *extkingpin.App) { MaxBodySize: 10 * 1024 * 1024, }, QueryRangeConfig: queryfrontend.QueryRangeConfig{ - Limits: &cortexvalidation.Limits{}, - ResultsCacheConfig: &queryrange.ResultsCacheConfig{}, + Limits: &cortexvalidation.Limits{}, }, LabelsConfig: queryfrontend.LabelsConfig{ - Limits: &cortexvalidation.Limits{}, - ResultsCacheConfig: &queryrange.ResultsCacheConfig{}, + Limits: &cortexvalidation.Limits{}, }, }, } diff --git a/pkg/queryfrontend/labels_codec.go b/pkg/queryfrontend/labels_codec.go index 6b2f5c4481..b4caed1cd5 100644 --- a/pkg/queryfrontend/labels_codec.go +++ b/pkg/queryfrontend/labels_codec.go @@ -49,20 +49,21 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du } } +// MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order. func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) { if len(responses) == 0 { + // Empty response for label_names, label_values and series API. return &ThanosLabelsResponse{ Status: queryrange.StatusSuccess, Data: []string{}, }, nil } - if len(responses) == 1 { - return responses[0], nil - } - switch responses[0].(type) { case *ThanosLabelsResponse: + if len(responses) == 1 { + return responses[0], nil + } set := make(map[string]struct{}) for _, res := range responses { @@ -83,25 +84,23 @@ func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange Data: lbls, }, nil case *ThanosSeriesResponse: - seriesData := make([]labelpb.LabelSet, 0) + if len(responses) == 1 { + return responses[0], nil + } + seriesData := make(labelpb.ZLabelSets, 0) - // seriesString is used in soring so we don't have to calculate the string of label sets again. - seriesString := make([]string, 0) uniqueSeries := make(map[string]struct{}) for _, res := range responses { for _, series := range res.(*ThanosSeriesResponse).Data { - s := labelpb.LabelsToPromLabels(series.Labels).String() + s := series.PromLabels().String() if _, ok := uniqueSeries[s]; !ok { seriesData = append(seriesData, series) - seriesString = append(seriesString, s) uniqueSeries[s] = struct{}{} } } } - sort.Slice(seriesData, func(i, j int) bool { - return seriesString[i] < seriesString[j] - }) + sort.Sort(seriesData) return &ThanosSeriesResponse{ Status: queryrange.StatusSuccess, Data: seriesData, @@ -134,7 +133,8 @@ func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request) (queryran } func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) { - var u *url.URL + var req *http.Request + var err error switch thanosReq := r.(type) { case *ThanosLabelsRequest: var params = url.Values{ @@ -145,10 +145,29 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (* if len(thanosReq.StoreMatchers) > 0 { params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers) } - u = &url.URL{ - Path: thanosReq.Path, - RawQuery: params.Encode(), + + // If label is not empty, then it is a label values query. + if thanosReq.Label != "" { + u := &url.URL{ + Path: thanosReq.Path, + RawQuery: params.Encode(), + } + + req = &http.Request{ + Method: http.MethodGet, + RequestURI: u.String(), // This is what the httpgrpc code looks at. + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + } else { + req, err = http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode())) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error()) + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") } + case *ThanosSeriesRequest: var params = url.Values{ "start": []string{encodeTime(thanosReq.Start)}, @@ -163,22 +182,17 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (* if len(thanosReq.StoreMatchers) > 0 { params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers) } - u = &url.URL{ - Path: thanosReq.Path, - RawQuery: params.Encode(), + + req, err = http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode())) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error()) } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + default: return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format") } - req := &http.Request{ - Method: "GET", - RequestURI: u.String(), // This is what the httpgrpc code looks at. - URL: u, - Body: http.NoBody, - Header: http.Header{}, - } - return req.WithContext(ctx), nil } diff --git a/pkg/queryfrontend/labels_codec_test.go b/pkg/queryfrontend/labels_codec_test.go index 9bc00e53e0..a0e6980716 100644 --- a/pkg/queryfrontend/labels_codec_test.go +++ b/pkg/queryfrontend/labels_codec_test.go @@ -227,14 +227,14 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) { name: "thanos labels names request", req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/labels"}, checkFunc: func(r *http.Request) bool { - return r.URL.Query().Get(start) == startTime && - r.URL.Query().Get(end) == endTime && + return r.FormValue(start) == startTime && + r.FormValue(end) == endTime && r.URL.Path == "/api/v1/labels" }, }, { name: "thanos labels values request", - req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values"}, + req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__"}, checkFunc: func(r *http.Request) bool { return r.URL.Query().Get(start) == startTime && r.URL.Query().Get(end) == endTime && @@ -243,7 +243,7 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) { }, { name: "thanos labels values request, partial response set to true", - req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", PartialResponse: true}, + req: &ThanosLabelsRequest{Start: 123000, End: 456000, Path: "/api/v1/label/__name__/values", Label: "__name__", PartialResponse: true}, checkFunc: func(r *http.Request) bool { return r.URL.Query().Get(start) == startTime && r.URL.Query().Get(end) == endTime && @@ -255,8 +255,8 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) { name: "thanos series request with empty matchers", req: &ThanosSeriesRequest{Start: 123000, End: 456000, Path: "/api/v1/series"}, checkFunc: func(r *http.Request) bool { - return r.URL.Query().Get(start) == startTime && - r.URL.Query().Get(end) == endTime && + return r.FormValue(start) == startTime && + r.FormValue(end) == endTime && r.URL.Path == "/api/v1/series" }, }, @@ -269,9 +269,9 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) { Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "cluster", "test")}}, }, checkFunc: func(r *http.Request) bool { - return r.URL.Query().Get(start) == startTime && - r.URL.Query().Get(end) == endTime && - r.URL.Query().Get(queryv1.MatcherParam) == `{cluster="test"}` && + return r.FormValue(start) == startTime && + r.FormValue(end) == endTime && + r.FormValue(queryv1.MatcherParam) == `{cluster="test"}` && r.URL.Path == "/api/v1/series" }, }, @@ -284,9 +284,9 @@ func TestLabelsCodec_EncodeRequest(t *testing.T) { Dedup: true, }, checkFunc: func(r *http.Request) bool { - return r.URL.Query().Get(start) == startTime && - r.URL.Query().Get(end) == endTime && - r.URL.Query().Get(queryv1.DedupParam) == "true" && + return r.FormValue(start) == startTime && + r.FormValue(end) == endTime && + r.FormValue(queryv1.DedupParam) == "true" && r.URL.Path == "/api/v1/series" }, }, @@ -313,12 +313,29 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) { labelsData, err := json.Marshal(labelResponse) testutil.Ok(t, err) + labelResponseWithHeaders := &ThanosLabelsResponse{ + Status: "success", + Data: []string{"__name__"}, + Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}}, + } + labelsDataWithHeaders, err := json.Marshal(labelResponseWithHeaders) + testutil.Ok(t, err) + seriesResponse := &ThanosSeriesResponse{ Status: "success", - Data: []labelpb.LabelSet{{Labels: []labelpb.Label{{Name: "foo", Value: "bar"}}}}, + Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}, } seriesData, err := json.Marshal(seriesResponse) testutil.Ok(t, err) + + seriesResponseWithHeaders := &ThanosSeriesResponse{ + Status: "success", + Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}, + Headers: []*ResponseHeader{{Name: cacheControlHeader, Values: []string{noStoreValue}}}, + } + seriesDataWithHeaders, err := json.Marshal(seriesResponseWithHeaders) + testutil.Ok(t, err) + for _, tc := range []struct { name string expectedError error @@ -344,12 +361,34 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) { res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsData))}, expectedResponse: labelResponse, }, + { + name: "thanos labels request with HTTP headers", + req: &ThanosLabelsRequest{}, + res: http.Response{ + StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(labelsDataWithHeaders)), + Header: map[string][]string{ + cacheControlHeader: {noStoreValue}, + }, + }, + expectedResponse: labelResponseWithHeaders, + }, { name: "thanos series request", req: &ThanosSeriesRequest{}, res: http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesData))}, expectedResponse: seriesResponse, }, + { + name: "thanos series request with HTTP headers", + req: &ThanosSeriesRequest{}, + res: http.Response{ + StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer(seriesDataWithHeaders)), + Header: map[string][]string{ + cacheControlHeader: {noStoreValue}, + }, + }, + expectedResponse: seriesResponseWithHeaders, + }, } { t.Run(tc.name, func(t *testing.T) { // Default partial response value doesn't matter when encoding requests. @@ -364,3 +403,117 @@ func TestLabelsCodec_DecodeResponse(t *testing.T) { }) } } + +func TestLabelsCodec_MergeResponse(t *testing.T) { + for _, tc := range []struct { + name string + expectedError error + responses []queryrange.Response + expectedResponse queryrange.Response + }{ + { + name: "Prometheus range query response format, not valid", + responses: []queryrange.Response{ + &queryrange.PrometheusResponse{Status: "success"}, + }, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format"), + }, + { + name: "Empty response", + responses: nil, + expectedResponse: &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}}, + }, + { + name: "One label response", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + }, + { + name: "One label response and two empty responses", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: []string{}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + }, + { + name: "Multiple duplicate label responses", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9090", "localhost:9091"}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9092"}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9092", "localhost:9093"}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", + Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}}, + }, + // This case shouldn't happen because the responses from Querier are sorted. + { + name: "Multiple unordered label responses", + responses: []queryrange.Response{ + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9093", "localhost:9092"}}, + &ThanosLabelsResponse{Status: "success", Data: []string{"localhost:9091", "localhost:9090"}}, + }, + expectedResponse: &ThanosLabelsResponse{Status: "success", + Data: []string{"localhost:9090", "localhost:9091", "localhost:9092", "localhost:9093"}}, + }, + { + name: "One series response", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + { + name: "One series response and two empty responses", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: queryrange.StatusSuccess}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + &ThanosSeriesResponse{Status: queryrange.StatusSuccess}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + { + name: "Multiple duplicate series responses", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}}}, + }, + { + name: "Multiple unordered series responses", + responses: []queryrange.Response{ + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{ + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}}, + }}, + &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{ + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}}, + }}, + }, + expectedResponse: &ThanosSeriesResponse{Status: "success", Data: []labelpb.ZLabelSet{ + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "aaa"}}}, + {Labels: []labelpb.ZLabel{{Name: "foo", Value: "bar"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "aaa"}, {Name: "instance", Value: "localhost:9090"}}}, + {Labels: []labelpb.ZLabel{{Name: "test", Value: "bbb"}, {Name: "instance", Value: "localhost:9091"}}}, + }}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Default partial response value doesn't matter when encoding requests. + codec := NewThanosLabelsCodec(false, time.Hour*2) + r, err := codec.MergeResponse(tc.responses...) + if tc.expectedError != nil { + testutil.Equals(t, err, tc.expectedError) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedResponse, r) + } + }) + } +} diff --git a/pkg/queryfrontend/response.pb.go b/pkg/queryfrontend/response.pb.go index 861e144053..a64e3bc9c8 100644 --- a/pkg/queryfrontend/response.pb.go +++ b/pkg/queryfrontend/response.pb.go @@ -67,12 +67,11 @@ func (m *ThanosLabelsResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ThanosLabelsResponse proto.InternalMessageInfo type ThanosSeriesResponse struct { - Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"` - // TODO(bwplotka): Experiment with ZLabelSet here. - Data []labelpb.LabelSet `protobuf:"bytes,2,rep,name=Data,proto3" json:"data"` - ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` - Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` - Headers []*ResponseHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"` + Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"` + Data []labelpb.ZLabelSet `protobuf:"bytes,2,rep,name=Data,proto3" json:"data"` + ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` + Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` + Headers []*ResponseHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"` } func (m *ThanosSeriesResponse) Reset() { *m = ThanosSeriesResponse{} } @@ -155,30 +154,30 @@ func init() { func init() { proto.RegisterFile("queryfrontend/response.proto", fileDescriptor_b882fa7024d92f38) } var fileDescriptor_b882fa7024d92f38 = []byte{ - // 362 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x92, 0x3f, 0x4f, 0xc2, 0x40, - 0x18, 0xc6, 0x5b, 0x28, 0x55, 0x0e, 0xff, 0xe5, 0x20, 0xb1, 0x10, 0x68, 0x09, 0x13, 0x26, 0xda, - 0x26, 0x18, 0x57, 0x87, 0x46, 0x13, 0x63, 0x8c, 0x43, 0x21, 0xee, 0x47, 0x78, 0x45, 0x12, 0xe8, - 0xd5, 0xbb, 0x63, 0xe8, 0xb7, 0xe0, 0x63, 0x31, 0x32, 0x3a, 0x35, 0x0a, 0x5b, 0x3f, 0x82, 0x93, - 0xe1, 0x7a, 0x8d, 0x30, 0x3a, 0xba, 0xdd, 0x3d, 0xcf, 0xef, 0x7d, 0x93, 0xe7, 0xc9, 0x8b, 0x9a, - 0xef, 0x73, 0x60, 0xf1, 0x2b, 0xa3, 0xa1, 0x80, 0x70, 0xe4, 0x31, 0xe0, 0x11, 0x0d, 0x39, 0xb8, - 0x11, 0xa3, 0x82, 0xe2, 0xe3, 0x3d, 0xb7, 0x51, 0x1b, 0xd3, 0x31, 0x95, 0x8e, 0xb7, 0x7d, 0x65, - 0x50, 0xa3, 0xce, 0x05, 0x65, 0xe0, 0x4d, 0xc9, 0x10, 0xa6, 0xd1, 0xd0, 0x13, 0x71, 0x04, 0x3c, - 0xb3, 0x3a, 0xdf, 0x3a, 0xaa, 0x0d, 0xde, 0x48, 0x48, 0xf9, 0xd3, 0xd6, 0xe5, 0x81, 0x5a, 0x8f, - 0x3b, 0xc8, 0xec, 0x0b, 0x22, 0xe6, 0xdc, 0xd2, 0xdb, 0x7a, 0xb7, 0xec, 0xa3, 0x34, 0x71, 0x4c, - 0x2e, 0x95, 0x40, 0x39, 0xb8, 0x89, 0x8c, 0x3b, 0x22, 0x88, 0x55, 0x68, 0x17, 0xbb, 0x65, 0xff, - 0x30, 0x4d, 0x1c, 0x63, 0x44, 0x04, 0x09, 0xa4, 0x8a, 0x6f, 0x50, 0xf9, 0x9e, 0x31, 0xca, 0x06, - 0x71, 0x04, 0x56, 0x51, 0x2e, 0x39, 0x4f, 0x13, 0xa7, 0x0a, 0xb9, 0x78, 0x49, 0x67, 0x13, 0x01, - 0xb3, 0x48, 0xc4, 0xc1, 0x2f, 0x89, 0x2f, 0x50, 0x49, 0x7e, 0x2c, 0x43, 0x8e, 0x54, 0xd3, 0xc4, - 0x39, 0x95, 0x23, 0x3b, 0x78, 0x46, 0xe0, 0x5b, 0x74, 0xf0, 0x00, 0x64, 0x04, 0x8c, 0x5b, 0xa5, - 0x76, 0xb1, 0x5b, 0xe9, 0xb5, 0xdc, 0xbd, 0x3a, 0xdc, 0x3c, 0x4d, 0x46, 0xf9, 0xa5, 0x34, 0x71, - 0xf4, 0xab, 0x20, 0x1f, 0xea, 0x2c, 0x0a, 0x79, 0xf8, 0x3e, 0xb0, 0x09, 0xfc, 0x2d, 0x7c, 0x6f, - 0x27, 0x7c, 0xa5, 0x77, 0xe6, 0x0a, 0xb9, 0xc8, 0x95, 0x35, 0xf6, 0x41, 0xf8, 0x47, 0xcb, 0xc4, - 0xd1, 0xfe, 0x5d, 0x25, 0x8f, 0xe8, 0x64, 0x9f, 0xc0, 0x75, 0x64, 0x3c, 0x93, 0x19, 0xa8, 0x26, - 0x14, 0x2f, 0x25, 0xdc, 0x42, 0xe6, 0x0b, 0x99, 0xce, 0x81, 0xab, 0x0b, 0x50, 0xa6, 0x12, 0xfd, - 0xe6, 0xf2, 0xcb, 0xd6, 0x96, 0x6b, 0x5b, 0x5f, 0xad, 0x6d, 0xfd, 0x73, 0x6d, 0xeb, 0x8b, 0x8d, - 0xad, 0xad, 0x36, 0xb6, 0xf6, 0xb1, 0xb1, 0xb5, 0xa1, 0x29, 0x0f, 0xf0, 0xfa, 0x27, 0x00, 0x00, - 0xff, 0xff, 0x68, 0xd1, 0x0b, 0xae, 0xe0, 0x02, 0x00, 0x00, + // 367 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x92, 0x41, 0x4f, 0xea, 0x40, + 0x14, 0x85, 0x5b, 0x28, 0x7d, 0x8f, 0xe1, 0x3d, 0x8d, 0x03, 0x89, 0x85, 0x40, 0x4b, 0x58, 0x61, + 0xa2, 0x6d, 0x02, 0x71, 0xeb, 0xa2, 0xd1, 0xc4, 0x18, 0xe3, 0xa2, 0x10, 0x17, 0xee, 0x86, 0x70, + 0x45, 0x12, 0xe8, 0xd4, 0x99, 0x61, 0xd1, 0x7f, 0xa1, 0xff, 0x8a, 0x25, 0x4b, 0x57, 0x8d, 0xc2, + 0xae, 0x3f, 0xc1, 0x95, 0x61, 0x3a, 0x8d, 0xb0, 0x74, 0xe9, 0x6e, 0xe6, 0x9c, 0xef, 0xde, 0xe4, + 0x9c, 0x5c, 0xd4, 0x7c, 0x5e, 0x00, 0x8b, 0x1f, 0x19, 0x0d, 0x05, 0x84, 0x63, 0x8f, 0x01, 0x8f, + 0x68, 0xc8, 0xc1, 0x8d, 0x18, 0x15, 0x14, 0xff, 0xdf, 0x73, 0x1b, 0xb5, 0x09, 0x9d, 0x50, 0xe9, + 0x78, 0xdb, 0x57, 0x06, 0x35, 0xea, 0x5c, 0x50, 0x06, 0xde, 0x8c, 0x8c, 0x60, 0x16, 0x8d, 0x3c, + 0x11, 0x47, 0xc0, 0x33, 0xab, 0xf3, 0xa9, 0xa3, 0xda, 0xf0, 0x89, 0x84, 0x94, 0xdf, 0x6e, 0x5d, + 0x1e, 0xa8, 0xf5, 0xb8, 0x83, 0xcc, 0x81, 0x20, 0x62, 0xc1, 0x2d, 0xbd, 0xad, 0x77, 0xcb, 0x3e, + 0x4a, 0x13, 0xc7, 0xe4, 0x52, 0x09, 0x94, 0x83, 0x9b, 0xc8, 0xb8, 0x24, 0x82, 0x58, 0x85, 0x76, + 0xb1, 0x5b, 0xf6, 0xff, 0xa6, 0x89, 0x63, 0x8c, 0x89, 0x20, 0x81, 0x54, 0xf1, 0x39, 0x2a, 0x5f, + 0x31, 0x46, 0xd9, 0x30, 0x8e, 0xc0, 0x2a, 0xca, 0x25, 0xc7, 0x69, 0xe2, 0x54, 0x21, 0x17, 0x4f, + 0xe9, 0x7c, 0x2a, 0x60, 0x1e, 0x89, 0x38, 0xf8, 0x26, 0xf1, 0x09, 0x2a, 0xc9, 0x8f, 0x65, 0xc8, + 0x91, 0x6a, 0x9a, 0x38, 0x87, 0x72, 0x64, 0x07, 0xcf, 0x08, 0x7c, 0x81, 0xfe, 0x5c, 0x03, 0x19, + 0x03, 0xe3, 0x56, 0xa9, 0x5d, 0xec, 0x56, 0x7a, 0x2d, 0x77, 0xaf, 0x0e, 0x37, 0x4f, 0x93, 0x51, + 0x7e, 0x29, 0x4d, 0x1c, 0xfd, 0x2c, 0xc8, 0x87, 0x3a, 0xaf, 0x85, 0x3c, 0xfc, 0x00, 0xd8, 0x14, + 0x7e, 0x16, 0xbe, 0xbf, 0x13, 0xbe, 0xd2, 0x3b, 0x72, 0x85, 0x5c, 0xe4, 0x3e, 0xc8, 0x1e, 0x07, + 0x20, 0xfc, 0x7f, 0xcb, 0xc4, 0xd1, 0x7e, 0x5d, 0x27, 0x37, 0xe8, 0x60, 0x9f, 0xc0, 0x75, 0x64, + 0xdc, 0x91, 0x39, 0xa8, 0x2a, 0x14, 0x2f, 0x25, 0xdc, 0x42, 0xe6, 0x3d, 0x99, 0x2d, 0x80, 0xab, + 0x13, 0x50, 0xa6, 0x12, 0xfd, 0xe6, 0xf2, 0xc3, 0xd6, 0x96, 0x6b, 0x5b, 0x5f, 0xad, 0x6d, 0xfd, + 0x7d, 0x6d, 0xeb, 0x2f, 0x1b, 0x5b, 0x5b, 0x6d, 0x6c, 0xed, 0x6d, 0x63, 0x6b, 0x23, 0x53, 0x5e, + 0x60, 0xff, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x6d, 0x4c, 0xcd, 0xdf, 0xe1, 0x02, 0x00, 0x00, } func (m *ThanosLabelsResponse) Marshal() (dAtA []byte, err error) { @@ -766,7 +765,7 @@ func (m *ThanosSeriesResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Data = append(m.Data, labelpb.LabelSet{}) + m.Data = append(m.Data, labelpb.ZLabelSet{}) if err := m.Data[len(m.Data)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/pkg/queryfrontend/response.proto b/pkg/queryfrontend/response.proto index 6a097217a5..69bf2c06e5 100644 --- a/pkg/queryfrontend/response.proto +++ b/pkg/queryfrontend/response.proto @@ -30,8 +30,7 @@ message ThanosLabelsResponse { message ThanosSeriesResponse { string Status = 1 [(gogoproto.jsontag) = "status"]; - // TODO(bwplotka): Experiment with ZLabelSet here. - repeated thanos.LabelSet Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data"]; + repeated thanos.ZLabelSet Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data"]; string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; repeated ResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"]; diff --git a/pkg/queryfrontend/roundtrip_test.go b/pkg/queryfrontend/roundtrip_test.go index 68a752b3ca..9eaaad8f86 100644 --- a/pkg/queryfrontend/roundtrip_test.go +++ b/pkg/queryfrontend/roundtrip_test.go @@ -749,7 +749,7 @@ func seriesResults(fail bool) (*int, http.Handler) { var lock sync.Mutex q := ThanosSeriesResponse{ Status: "success", - Data: []labelpb.LabelSet{{Labels: []labelpb.Label{{Name: "__name__", Value: "up"}, {Name: "foo", Value: "bar"}}}}, + Data: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "__name__", Value: "up"}, {Name: "foo", Value: "bar"}}}}, } return &count, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 5638f69e5f..36c5bfc139 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -295,3 +295,27 @@ func DeepCopy(lbls []ZLabel) []ZLabel { } return ret } + +// ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted. +type ZLabelSets []ZLabelSet + +func (z ZLabelSets) Len() int { return len(z) } + +func (z ZLabelSets) Swap(i, j int) { z[i], z[j] = z[j], z[i] } + +func (z ZLabelSets) Less(i, j int) bool { + l := 0 + r := 0 + var result int + for l < z[i].Size() && r < z[j].Size() { + result = z[i].Labels[l].Compare(z[j].Labels[r]) + if result == 0 { + l++ + r++ + continue + } + return result < 0 + } + + return l == z[i].Size() +} diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index 6656eea445..ab8543657b 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -5,6 +5,8 @@ package labelpb import ( "fmt" + "reflect" + "sort" "testing" "github.com/prometheus/prometheus/pkg/labels" @@ -104,3 +106,90 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) { } }) } + +func TestSortZLabelSets(t *testing.T) { + expectedResult := ZLabelSets{ + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "LabelNames", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_server_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "up", + "instance": "localhost:10908", + }), + ), + }, + } + + list := ZLabelSets{ + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "up", + "instance": "localhost:10908", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_server_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "LabelNames", + }), + ), + }, + { + Labels: ZLabelsFromPromLabels( + labels.FromMap(map[string]string{ + "__name__": "grpc_client_handled_total", + "cluster": "test", + "grpc_code": "OK", + "grpc_method": "Info", + }), + ), + }, + } + + sort.Sort(list) + reflect.DeepEqual(expectedResult, list) +} diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 196365a36d..7ee97b4acf 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -5,6 +5,7 @@ package e2e_test import ( "context" + "reflect" "testing" "time" @@ -262,7 +263,7 @@ func TestQueryFrontend(t *testing.T) { t.Run("query frontend splitting works for labels values API", func(t *testing.T) { labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 + return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( e2e.Equals(1), @@ -281,7 +282,7 @@ func TestQueryFrontend(t *testing.T) { ) labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 + return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( e2e.Equals(3), @@ -309,7 +310,16 @@ func TestQueryFrontend(t *testing.T) { timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []map[string]string) bool { - return len(res) > 0 + if len(res) != 1 { + return false + } + + return reflect.DeepEqual(res[0], map[string]string{ + "__name__": "up", + "instance": "localhost:9090", + "job": "myself", + "prometheus": "test", + }) }, ) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -336,7 +346,16 @@ func TestQueryFrontend(t *testing.T) { timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []map[string]string) bool { - return len(res) > 0 + if len(res) != 1 { + return false + } + + return reflect.DeepEqual(res[0], map[string]string{ + "__name__": "up", + "instance": "localhost:9090", + "job": "myself", + "prometheus": "test", + }) }, ) testutil.Ok(t, q.WaitSumMetricsWithOptions( diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 90db116f03..1658ebcfb6 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -328,7 +328,7 @@ func TestQueryLabelValues(t *testing.T) { now := time.Now() labelValues(t, ctx, q.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { - return len(res) > 0 + return len(res) == 1 && res[0] == "localhost:9090" }) // Outside time range. @@ -428,7 +428,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, start, end int64 logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, mustURLParse(t, "http://"+addr), start, end) if err != nil { return err @@ -437,7 +437,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, start, end int64 return nil } - return errors.Errorf("unexpected results size %d", len(res)) + return errors.Errorf("unexpected results %v", res) })) } @@ -447,7 +447,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, start, e logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, mustURLParse(t, "http://"+addr), label, start, end) if err != nil { return err @@ -456,7 +456,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, start, e return nil } - return errors.Errorf("unexpected results size %d", len(res)) + return errors.Errorf("unexpected results %v", res) })) } @@ -465,7 +465,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []storepb.L logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 2*time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end) if err != nil { return err @@ -474,7 +474,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []storepb.L return nil } - return errors.Errorf("unexpected results size %d", len(res)) + return errors.Errorf("unexpected results %v", res) })) }