Skip to content

Commit

Permalink
Implement IsPriorityQuery
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <[email protected]>
  • Loading branch information
justinjung04 committed Oct 18, 2023
1 parent 28977ea commit 6ec2ada
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4989,7 +4989,7 @@ otel:
# If query range falls between the start_time and end_time (on top of meeting
# all other criteria), query is treated as a high priority.
[start_time: <duration> | default = 1h]
[start_time: <duration> | default = 0s]
# If query range falls between the start_time and end_time (on top of meeting
# all other criteria), query is treated as a high priority.
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i
cfg.FrontendV2.Port = grpcListenPort
}

fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry)
fr, err := v2.NewFrontend(cfg.FrontendV2, limits, log, reg, retry)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err

default:
Expand Down
5 changes: 3 additions & 2 deletions pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"context"
"io"
"net/http"
"net/url"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
)

// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
RoundTripGRPC(context.Context, url.Values, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}

func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
Expand All @@ -39,7 +40,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
return nil, err
}

resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, req)
if err != nil {
return nil, err
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"net/http"
"net/url"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -53,13 +54,18 @@ type Limits interface {
// MockLimits implements the Limits interface. Used in tests only.
type MockLimits struct {
Queriers float64
Queries []validation.HighPriorityQuery
queue.MockLimits
}

func (l MockLimits) MaxQueriersPerUser(_ string) float64 {
return l.Queriers
}

func (l MockLimits) HighPriorityQueries(_ string) []validation.HighPriorityQuery {
return l.Queries
}

// Frontend queues HTTP requests, dispatches them to backends, and handles retries
// for requests which failed.
type Frontend struct {
Expand Down Expand Up @@ -171,7 +177,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) {
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
Expand All @@ -182,10 +188,17 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
}
}

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)

return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
request := request{
request: req,
originalCtx: ctx,
request: req,
originalCtx: ctx,
isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)),

// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
Expand Down Expand Up @@ -347,7 +360,6 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
now := time.Now()
req.enqueueTime = now
req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued")
req.isHighPriority = util_query.IsHighPriority()

// aggregate the max queriers limit in the case of a multi tenant query
maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, f.limits.MaxQueriersPerUser)
Expand Down
18 changes: 11 additions & 7 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"fmt"
"math/rand"
"net/http"
"net/url"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/scheduler"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -65,10 +68,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Frontend struct {
services.Service

cfg Config
log log.Logger

retry *transport.Retry
cfg Config
log log.Logger
limits scheduler.Limits
retry *transport.Retry

lastQueryID atomic.Uint64

Expand Down Expand Up @@ -114,7 +117,7 @@ type enqueueResult struct {
}

// NewFrontend creates a new frontend.
func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
func NewFrontend(cfg Config, limits scheduler.Limits, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
requestsCh := make(chan *frontendRequest)

schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log)
Expand All @@ -124,6 +127,7 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *t

f := &Frontend{
cfg: cfg,
limits: limits,
log: log,
requestsCh: requestsCh,
schedulerWorkers: schedulerWorkers,
Expand Down Expand Up @@ -167,7 +171,7 @@ func (f *Frontend) stopping(_ error) error {
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
Expand Down Expand Up @@ -196,7 +200,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
request: req,
userID: userID,
statsEnabled: stats.IsEnabled(ctx),
isHighPriority: util_query.IsHighPriority(),
isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)),

cancel: cancel,

Expand Down
17 changes: 10 additions & 7 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package v2
import (
"context"
"net"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/scheduler/queue"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -48,7 +51,7 @@ func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *sched

//logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()
f, err := NewFrontend(cfg, logger, nil, transport.NewRetry(maxRetries, nil))
f, err := NewFrontend(cfg, queue.MockLimits{}, logger, nil, transport.NewRetry(maxRetries, nil))
require.NoError(t, err)

frontendv2pb.RegisterFrontendForQuerierServer(server, f)
Expand Down Expand Up @@ -110,7 +113,7 @@ func TestFrontendBasicWorkflow(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 0)

resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)
Expand Down Expand Up @@ -140,7 +143,7 @@ func TestFrontendRetryRequest(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 3)

res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), res.Code)
}
Expand All @@ -167,7 +170,7 @@ func TestFrontendRetryEnqueue(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 0)

_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
require.NoError(t, err)
}

Expand All @@ -176,7 +179,7 @@ func TestFrontendEnqueueFailure(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
}, 0)

_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{})
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, &httpgrpc.HTTPRequest{})
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
}
Expand All @@ -187,7 +190,7 @@ func TestFrontendCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)

Expand Down Expand Up @@ -236,7 +239,7 @@ func TestFrontendFailedCancellation(t *testing.T) {
}()

// send request
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, resp)

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestRequestQueue_ReservedQueriersShouldOnlyGetHighPriorityQueries(t *testin
queue := NewRequestQueue(0, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 3, ReservedQueriers: 1},
MockLimits{MaxOutstanding: 3, reservedHighPriorityQueriers: 1},
nil,
)
ctx := context.Background()
Expand Down
21 changes: 18 additions & 3 deletions pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sort"
"time"

"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/cortexproject/cortex/pkg/util"
)

Expand All @@ -21,6 +23,9 @@ type Limits interface {
// If ReservedHighPriorityQueriers is capped by MaxQueriersPerUser.
// If less than 1, it will be applied as a percentage of MaxQueriersPerUser.
ReservedHighPriorityQueriers(user string) float64

// HighPriorityQueries returns list of definitions for high priority query.
HighPriorityQueries(user string) []validation.HighPriorityQuery
}

// querier holds information about a querier registered in the queue.
Expand Down Expand Up @@ -374,14 +379,24 @@ func getNumOfReservedQueriers(queriersToSelect int, totalNumOfQueriers int, rese

// MockLimits implements the Limits interface. Used in tests only.
type MockLimits struct {
MaxOutstanding int
ReservedQueriers float64
MaxOutstanding int
maxQueriersPerUser float64
reservedHighPriorityQueriers float64
highPriorityQueries []validation.HighPriorityQuery
}

func (l MockLimits) MaxQueriersPerUser(user string) float64 {
return l.maxQueriersPerUser
}

func (l MockLimits) MaxOutstandingPerTenant(_ string) int {
return l.MaxOutstanding
}

func (l MockLimits) ReservedHighPriorityQueriers(_ string) float64 {
return l.ReservedQueriers
return l.reservedHighPriorityQueriers
}

func (l MockLimits) HighPriorityQueries(_ string) []validation.HighPriorityQuery {
return l.highPriorityQueries
}
49 changes: 49 additions & 0 deletions pkg/util/query/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package query

import (
"net/url"
"regexp"
"strconv"
"time"

"github.com/cortexproject/cortex/pkg/util/validation"
)

func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.HighPriorityQuery) bool {
queryParam := requestParams.Get("query")
timeParam := requestParams.Get("time")
startParam := requestParams.Get("start")
endParam := requestParams.Get("end")

for _, highPriorityQuery := range highPriorityQueries {
regex := highPriorityQuery.Regex

if match, err := regexp.MatchString(regex, queryParam); !match || err != nil {
continue
}

now := time.Now()
startTimeThreshold := now.Add(-1 * highPriorityQuery.StartTime).UnixMilli()
endTimeThreshold := now.Add(-1 * highPriorityQuery.EndTime).UnixMilli()

if time, err := strconv.ParseInt(timeParam, 10, 64); err == nil {
if isBetweenThresholds(time, time, startTimeThreshold, endTimeThreshold) {
return true
}
}

if startTime, err := strconv.ParseInt(startParam, 10, 64); err == nil {
if endTime, err := strconv.ParseInt(endParam, 10, 64); err == nil {
if isBetweenThresholds(startTime, endTime, startTimeThreshold, endTimeThreshold) {
return true
}
}
}
}

return false
}

func isBetweenThresholds(start, end, startThreshold, endThreshold int64) bool {
return start >= startThreshold && end <= endThreshold
}
Loading

0 comments on commit 6ec2ada

Please sign in to comment.