diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 9c8a20057a6..00fd0d7b5d3 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4989,7 +4989,7 @@ otel: # If query range falls between the start_time and end_time (on top of meeting # all other criteria), query is treated as a high priority. -[start_time: | default = 1h] +[start_time: | default = 0s] # If query range falls between the start_time and end_time (on top of meeting # all other criteria), query is treated as a high priority. diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index 8ef8fa36030..03dff13980e 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -59,7 +59,7 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i cfg.FrontendV2.Port = grpcListenPort } - fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry) + fr, err := v2.NewFrontend(cfg.FrontendV2, limits, log, reg, retry) return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err default: diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index 583fc22d04a..e996d8abb3b 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -5,6 +5,7 @@ import ( "context" "io" "net/http" + "net/url" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" @@ -12,7 +13,7 @@ import ( // GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages. type GrpcRoundTripper interface { - RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) + RoundTripGRPC(context.Context, url.Values, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) } func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { @@ -39,7 +40,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er return nil, err } - resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req) + resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, req) if err != nil { return nil, err } diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 9eae59255b9..64e19f35cb3 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "net/http" + "net/url" "time" "github.com/go-kit/log" @@ -53,6 +54,7 @@ type Limits interface { // MockLimits implements the Limits interface. Used in tests only. type MockLimits struct { Queriers float64 + Queries []validation.HighPriorityQuery queue.MockLimits } @@ -60,6 +62,10 @@ func (l MockLimits) MaxQueriersPerUser(_ string) float64 { return l.Queriers } +func (l MockLimits) HighPriorityQueries(_ string) []validation.HighPriorityQuery { + return l.Queries +} + // Frontend queues HTTP requests, dispatches them to backends, and handles retries // for requests which failed. type Frontend struct { @@ -171,7 +177,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) { } // RoundTripGRPC round trips a proto (instead of a HTTP request). -func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { +func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { // Propagate trace context in gRPC too - this will be ignored if using HTTP. tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) if tracer != nil && span != nil { @@ -182,10 +188,17 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) } } + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + userID := tenant.JoinTenantIDs(tenantIDs) + return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { request := request{ - request: req, - originalCtx: ctx, + request: req, + originalCtx: ctx, + isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)), // Buffer of 1 to ensure response can be written by the server side // of the Process stream, even if this goroutine goes away due to @@ -347,7 +360,6 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error { now := time.Now() req.enqueueTime = now req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued") - req.isHighPriority = util_query.IsHighPriority() // aggregate the max queriers limit in the case of a multi tenant query maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, f.limits.MaxQueriersPerUser) diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index e01b9562e87..3f0b8d33729 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -6,9 +6,12 @@ import ( "fmt" "math/rand" "net/http" + "net/url" "sync" "time" + "github.com/cortexproject/cortex/pkg/scheduler" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" @@ -65,10 +68,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Frontend struct { services.Service - cfg Config - log log.Logger - - retry *transport.Retry + cfg Config + log log.Logger + limits scheduler.Limits + retry *transport.Retry lastQueryID atomic.Uint64 @@ -114,7 +117,7 @@ type enqueueResult struct { } // NewFrontend creates a new frontend. -func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { +func NewFrontend(cfg Config, limits scheduler.Limits, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { requestsCh := make(chan *frontendRequest) schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log) @@ -124,6 +127,7 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *t f := &Frontend{ cfg: cfg, + limits: limits, log: log, requestsCh: requestsCh, schedulerWorkers: schedulerWorkers, @@ -167,7 +171,7 @@ func (f *Frontend) stopping(_ error) error { } // RoundTripGRPC round trips a proto (instead of a HTTP request). -func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { +func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { if s := f.State(); s != services.Running { return nil, fmt.Errorf("frontend not running: %v", s) } @@ -196,7 +200,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) request: req, userID: userID, statsEnabled: stats.IsEnabled(ctx), - isHighPriority: util_query.IsHighPriority(), + isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)), cancel: cancel, diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index 59729e1757a..68023f9f570 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -3,12 +3,15 @@ package v2 import ( "context" "net" + "net/url" "strconv" "strings" "sync" "testing" "time" + "github.com/cortexproject/cortex/pkg/scheduler/queue" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -48,7 +51,7 @@ func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *sched //logger := log.NewLogfmtLogger(os.Stdout) logger := log.NewNopLogger() - f, err := NewFrontend(cfg, logger, nil, transport.NewRetry(maxRetries, nil)) + f, err := NewFrontend(cfg, queue.MockLimits{}, logger, nil, transport.NewRetry(maxRetries, nil)) require.NoError(t, err) frontendv2pb.RegisterFrontendForQuerierServer(server, f) @@ -110,7 +113,7 @@ func TestFrontendBasicWorkflow(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 0) - resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) + resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{}) require.NoError(t, err) require.Equal(t, int32(200), resp.Code) require.Equal(t, []byte(body), resp.Body) @@ -140,7 +143,7 @@ func TestFrontendRetryRequest(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 3) - res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) + res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{}) require.NoError(t, err) require.Equal(t, int32(200), res.Code) } @@ -167,7 +170,7 @@ func TestFrontendRetryEnqueue(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 0) - _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{}) require.NoError(t, err) } @@ -176,7 +179,7 @@ func TestFrontendEnqueueFailure(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN} }, 0) - _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{}) + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, &httpgrpc.HTTPRequest{}) require.Error(t, err) require.True(t, strings.Contains(err.Error(), "failed to enqueue request")) } @@ -187,7 +190,7 @@ func TestFrontendCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}) + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{}) require.EqualError(t, err, context.DeadlineExceeded.Error()) require.Nil(t, resp) @@ -236,7 +239,7 @@ func TestFrontendFailedCancellation(t *testing.T) { }() // send request - resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}) + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{}) require.EqualError(t, err, context.Canceled.Error()) require.Nil(t, resp) diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 5e260323ecf..283eb9df4f4 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -200,7 +200,7 @@ func TestRequestQueue_ReservedQueriersShouldOnlyGetHighPriorityQueries(t *testin queue := NewRequestQueue(0, 0, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - MockLimits{MaxOutstanding: 3, ReservedQueriers: 1}, + MockLimits{MaxOutstanding: 3, reservedHighPriorityQueriers: 1}, nil, ) ctx := context.Background() diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index fa6ac4296d9..29194b613bf 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -6,6 +6,8 @@ import ( "sort" "time" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/cortexproject/cortex/pkg/util" ) @@ -21,6 +23,9 @@ type Limits interface { // If ReservedHighPriorityQueriers is capped by MaxQueriersPerUser. // If less than 1, it will be applied as a percentage of MaxQueriersPerUser. ReservedHighPriorityQueriers(user string) float64 + + // HighPriorityQueries returns list of definitions for high priority query. + HighPriorityQueries(user string) []validation.HighPriorityQuery } // querier holds information about a querier registered in the queue. @@ -374,8 +379,14 @@ func getNumOfReservedQueriers(queriersToSelect int, totalNumOfQueriers int, rese // MockLimits implements the Limits interface. Used in tests only. type MockLimits struct { - MaxOutstanding int - ReservedQueriers float64 + MaxOutstanding int + maxQueriersPerUser float64 + reservedHighPriorityQueriers float64 + highPriorityQueries []validation.HighPriorityQuery +} + +func (l MockLimits) MaxQueriersPerUser(user string) float64 { + return l.maxQueriersPerUser } func (l MockLimits) MaxOutstandingPerTenant(_ string) int { @@ -383,5 +394,9 @@ func (l MockLimits) MaxOutstandingPerTenant(_ string) int { } func (l MockLimits) ReservedHighPriorityQueriers(_ string) float64 { - return l.ReservedQueriers + return l.reservedHighPriorityQueriers +} + +func (l MockLimits) HighPriorityQueries(_ string) []validation.HighPriorityQuery { + return l.highPriorityQueries } diff --git a/pkg/util/query/priority.go b/pkg/util/query/priority.go new file mode 100644 index 00000000000..7347ce04d13 --- /dev/null +++ b/pkg/util/query/priority.go @@ -0,0 +1,49 @@ +package query + +import ( + "net/url" + "regexp" + "strconv" + "time" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.HighPriorityQuery) bool { + queryParam := requestParams.Get("query") + timeParam := requestParams.Get("time") + startParam := requestParams.Get("start") + endParam := requestParams.Get("end") + + for _, highPriorityQuery := range highPriorityQueries { + regex := highPriorityQuery.Regex + + if match, err := regexp.MatchString(regex, queryParam); !match || err != nil { + continue + } + + now := time.Now() + startTimeThreshold := now.Add(-1 * highPriorityQuery.StartTime).UnixMilli() + endTimeThreshold := now.Add(-1 * highPriorityQuery.EndTime).UnixMilli() + + if time, err := strconv.ParseInt(timeParam, 10, 64); err == nil { + if isBetweenThresholds(time, time, startTimeThreshold, endTimeThreshold) { + return true + } + } + + if startTime, err := strconv.ParseInt(startParam, 10, 64); err == nil { + if endTime, err := strconv.ParseInt(endParam, 10, 64); err == nil { + if isBetweenThresholds(startTime, endTime, startTimeThreshold, endTimeThreshold) { + return true + } + } + } + } + + return false +} + +func isBetweenThresholds(start, end, startThreshold, endThreshold int64) bool { + return start >= startThreshold && end <= endThreshold +} diff --git a/pkg/util/query/priority_test.go b/pkg/util/query/priority_test.go new file mode 100644 index 00000000000..2e7a414487c --- /dev/null +++ b/pkg/util/query/priority_test.go @@ -0,0 +1,154 @@ +package query + +import ( + "net/url" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func Test_IsHighPriority_DefaultValues(t *testing.T) { + now := time.Now() + config := []validation.HighPriorityQuery{ + {}, // By default, it should match all queries happened at "now" + } + + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.Add(-1*time.Second).UnixMilli(), 10)}, + }, config)) +} + +func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) { + now := time.Now() + config := []validation.HighPriorityQuery{ + { + Regex: "sum", + }, + } + + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + + config = []validation.HighPriorityQuery{ + { + Regex: "up", + }, + } + + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + + config = []validation.HighPriorityQuery{ + { + Regex: "sum", + }, + { + Regex: "c(.+)t", + }, + } + + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + + config = []validation.HighPriorityQuery{ + { + Regex: "doesnotexist", + }, + { + Regex: "^sum$", + }, + } + + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + + config = []validation.HighPriorityQuery{ + { + Regex: ".*", + }, + } + + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"count(up)"}, + "time": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) +} + +func Test_IsHighPriority_ShouldBeBetweenStartAndEndTime(t *testing.T) { + now := time.Now() + config := []validation.HighPriorityQuery{ + { + StartTime: 1 * time.Hour, + EndTime: 30 * time.Minute, + }, + } + + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)}, + }, config)) + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)}, + }, config)) + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)}, + }, config)) + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "time": []string{strconv.FormatInt(now.Add(-1*time.Minute).UnixMilli(), 10)}, + }, config)) + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "start": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)}, + "end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)}, + }, config)) + assert.True(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)}, + "end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)}, + }, config)) + assert.False(t, IsHighPriority(url.Values{ + "query": []string{"sum(up)"}, + "start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)}, + "end": []string{strconv.FormatInt(now.UnixMilli(), 10)}, + }, config)) +} diff --git a/pkg/util/query/query_priority.go b/pkg/util/query/query_priority.go deleted file mode 100644 index 6e18fe61dec..00000000000 --- a/pkg/util/query/query_priority.go +++ /dev/null @@ -1,9 +0,0 @@ -package query - -func IsHighPriority() bool { - // TODO: Implement - // query string <> regex - // query param <> start_time, end_time - - return false -} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 145ad317aee..301e945ea80 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -48,7 +48,7 @@ type DisabledRuleGroups []DisabledRuleGroup type HighPriorityQuery struct { Regex string `yaml:"regex" doc:"nocli|description=Query string regex. If evaluated true (on top of meeting all other criteria), query is treated as a high priority."` - StartTime time.Duration `yaml:"start_time" doc:"nocli|description=If query range falls between the start_time and end_time (on top of meeting all other criteria), query is treated as a high priority.|default=1h"` + StartTime time.Duration `yaml:"start_time" doc:"nocli|description=If query range falls between the start_time and end_time (on top of meeting all other criteria), query is treated as a high priority.|default=0s"` EndTime time.Duration `yaml:"end_time" doc:"nocli|description=If query range falls between the start_time and end_time (on top of meeting all other criteria), query is treated as a high priority.|default=0s"` }