diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index e996d8abb3b..ccbd8d4cc51 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "net/url" + "time" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" @@ -13,7 +14,7 @@ import ( // GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages. type GrpcRoundTripper interface { - RoundTripGRPC(context.Context, url.Values, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) + RoundTripGRPC(context.Context, url.Values, time.Time, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) } func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { @@ -40,7 +41,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er return nil, err } - resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, req) + resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, time.Now(), req) if err != nil { return nil, err } diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 64e19f35cb3..499790f3af7 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -177,7 +177,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) { } // RoundTripGRPC round trips a proto (instead of a HTTP request). -func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { +func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { // Propagate trace context in gRPC too - this will be ignored if using HTTP. tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) if tracer != nil && span != nil { @@ -198,7 +198,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, request := request{ request: req, originalCtx: ctx, - isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)), + isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)), // Buffer of 1 to ensure response can be written by the server side // of the Process stream, even if this goroutine goes away due to diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 3f0b8d33729..447f797a430 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -171,7 +171,7 @@ func (f *Frontend) stopping(_ error) error { } // RoundTripGRPC round trips a proto (instead of a HTTP request). -func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { +func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { if s := f.State(); s != services.Running { return nil, fmt.Errorf("frontend not running: %v", s) } @@ -200,7 +200,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, request: req, userID: userID, statsEnabled: stats.IsEnabled(ctx), - isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)), + isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)), cancel: cancel, diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index 68023f9f570..3b7b9337eb4 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -113,7 +113,7 @@ func TestFrontendBasicWorkflow(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 0) - resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{}) + resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{}) require.NoError(t, err) require.Equal(t, int32(200), resp.Code) require.Equal(t, []byte(body), resp.Body) @@ -143,7 +143,7 @@ func TestFrontendRetryRequest(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 3) - res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{}) + res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{}) require.NoError(t, err) require.Equal(t, int32(200), res.Code) } @@ -170,7 +170,7 @@ func TestFrontendRetryEnqueue(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 0) - _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{}) + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{}) require.NoError(t, err) } @@ -179,7 +179,7 @@ func TestFrontendEnqueueFailure(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN} }, 0) - _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, &httpgrpc.HTTPRequest{}) + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{}) require.Error(t, err) require.True(t, strings.Contains(err.Error(), "failed to enqueue request")) } @@ -190,7 +190,7 @@ func TestFrontendCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{}) + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{}) require.EqualError(t, err, context.DeadlineExceeded.Error()) require.Nil(t, resp) @@ -239,7 +239,7 @@ func TestFrontendFailedCancellation(t *testing.T) { }() // send request - resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{}) + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{}) require.EqualError(t, err, context.Canceled.Error()) require.Nil(t, resp) diff --git a/pkg/util/query/priority.go b/pkg/util/query/priority.go index 7347ce04d13..45d36f4270f 100644 --- a/pkg/util/query/priority.go +++ b/pkg/util/query/priority.go @@ -9,7 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.HighPriorityQuery) bool { +func IsHighPriority(requestParams url.Values, timestamp time.Time, highPriorityQueries []validation.HighPriorityQuery) bool { queryParam := requestParams.Get("query") timeParam := requestParams.Get("time") startParam := requestParams.Get("start") @@ -22,12 +22,11 @@ func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.H continue } - now := time.Now() - startTimeThreshold := now.Add(-1 * highPriorityQuery.StartTime).UnixMilli() - endTimeThreshold := now.Add(-1 * highPriorityQuery.EndTime).UnixMilli() + startTimeThreshold := timestamp.Add(-1 * highPriorityQuery.StartTime.Abs()).UnixMilli() + endTimeThreshold := timestamp.Add(-1 * highPriorityQuery.EndTime.Abs()).UnixMilli() - if time, err := strconv.ParseInt(timeParam, 10, 64); err == nil { - if isBetweenThresholds(time, time, startTimeThreshold, endTimeThreshold) { + if instantTime, err := strconv.ParseInt(timeParam, 10, 64); err == nil { + if isBetweenThresholds(instantTime, instantTime, startTimeThreshold, endTimeThreshold) { return true } } diff --git a/pkg/util/query/priority_test.go b/pkg/util/query/priority_test.go index 2e7a414487c..dd1ac9c78b3 100644 --- a/pkg/util/query/priority_test.go +++ b/pkg/util/query/priority_test.go @@ -11,22 +11,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -func Test_IsHighPriority_DefaultValues(t *testing.T) { - now := time.Now() - config := []validation.HighPriorityQuery{ - {}, // By default, it should match all queries happened at "now" - } - - assert.True(t, IsHighPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) - assert.False(t, IsHighPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Add(-1*time.Second).UnixMilli(), 10)}, - }, config)) -} - func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) { now := time.Now() config := []validation.HighPriorityQuery{ @@ -38,11 +22,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) { assert.True(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.False(t, IsHighPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) config = []validation.HighPriorityQuery{ { @@ -53,11 +37,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) { assert.True(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.True(t, IsHighPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) config = []validation.HighPriorityQuery{ { @@ -71,11 +55,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) { assert.True(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.True(t, IsHighPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) config = []validation.HighPriorityQuery{ { @@ -89,11 +73,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) { assert.False(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.False(t, IsHighPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) config = []validation.HighPriorityQuery{ { @@ -104,11 +88,26 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) { assert.True(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, now, config)) + + config = []validation.HighPriorityQuery{ + { + Regex: "", + }, + } + + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, now, config)) assert.True(t, IsHighPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) } func Test_IsHighPriority_ShouldBeBetweenStartAndEndTime(t *testing.T) { @@ -123,32 +122,32 @@ func Test_IsHighPriority_ShouldBeBetweenStartAndEndTime(t *testing.T) { assert.False(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.True(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.True(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.False(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-1*time.Minute).UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.False(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)}, "end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.True(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)}, "end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)}, - }, config)) + }, now, config)) assert.False(t, IsHighPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)}, "end": []string{strconv.FormatInt(now.UnixMilli(), 10)}, - }, config)) + }, now, config)) }