From 96ff019ddba591815be280b4f9c9aacf63733a42 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Fri, 17 Nov 2023 08:58:58 -0800 Subject: [PATCH] Address comments Signed-off-by: Justin Jung --- pkg/frontend/v1/frontend.go | 35 +++++--- pkg/frontend/v2/frontend.go | 34 +++++--- pkg/frontend/v2/frontend_test.go | 3 +- pkg/scheduler/queue/user_request_queue.go | 24 +---- pkg/util/query/priority.go | 64 ++++++-------- pkg/util/query/priority_test.go | 101 ++++++++-------------- 6 files changed, 115 insertions(+), 146 deletions(-) diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 54e8ada594b..8ea66fbd588 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "reflect" + "sync" "time" "github.com/go-kit/log" @@ -81,10 +82,11 @@ type Frontend struct { activeUsers *util.ActiveUsersCleanupService // Used to check whether query priority config has changed - queryPriority validation.QueryPriority + queryPriority map[string]validation.QueryPriority + queryPriorityMtx map[string]*sync.RWMutex // Populate and reuse compiled regex until query priority config changes - compiledQueryPriority validation.QueryPriority + compiledQueryPriority map[string]validation.QueryPriority // Subservices manager. subservices *services.Manager @@ -217,16 +219,29 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, response: make(chan *httpgrpc.HTTPResponse, 1), } - queryPriority := f.limits.QueryPriority(userID) + if reqParams != nil { + queryPriority := f.limits.QueryPriority(userID) - if reqParams != nil && queryPriority.Enabled { - queryPriorityChanged := !reflect.DeepEqual(f.queryPriority, queryPriority) - if queryPriorityChanged { - f.queryPriority = queryPriority - f.compiledQueryPriority = queryPriority - } + if queryPriority.Enabled { + if _, exists := f.queryPriorityMtx[userID]; !exists { + f.queryPriorityMtx[userID] = &sync.RWMutex{} + } + + f.queryPriorityMtx[userID].RLock() + queryPriorityChanged := !reflect.DeepEqual(f.queryPriority[userID], queryPriority) + f.queryPriorityMtx[userID].RUnlock() - request.priority = util_query.GetPriority(reqParams, ts, &f.compiledQueryPriority, queryPriorityChanged) + if queryPriorityChanged { + f.queryPriorityMtx[userID].Lock() + f.queryPriority[userID] = queryPriority + f.compiledQueryPriority[userID] = util_query.GetCompileQueryPriority(queryPriority) + f.queryPriorityMtx[userID].Unlock() + } + + f.queryPriorityMtx[userID].RLock() + request.priority = util_query.GetPriority(reqParams, ts, f.compiledQueryPriority[userID]) + f.queryPriorityMtx[userID].Unlock() + } } if err := f.queueRequest(ctx, &request); err != nil { diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index b05af6277f5..41dd4e77496 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -77,10 +77,11 @@ type Frontend struct { lastQueryID atomic.Uint64 // Used to check whether query priority config has changed - queryPriority validation.QueryPriority + queryPriority map[string]validation.QueryPriority + queryPriorityMtx map[string]*sync.RWMutex // Populate and reuse compiled regex until query priority config changes - compiledQueryPriority validation.QueryPriority + compiledQueryPriority map[string]validation.QueryPriority // frontend workers will read from this channel, and send request to scheduler. requestsCh chan *frontendRequest @@ -218,16 +219,29 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, } - queryPriority := f.limits.QueryPriority(userID) + if reqParams != nil { + queryPriority := f.limits.QueryPriority(userID) - if reqParams != nil && queryPriority.Enabled { - queryPriorityChanged := !reflect.DeepEqual(f.queryPriority, queryPriority) - if queryPriorityChanged { - f.queryPriority = queryPriority - f.compiledQueryPriority = queryPriority - } + if queryPriority.Enabled { + if _, exists := f.queryPriorityMtx[userID]; !exists { + f.queryPriorityMtx[userID] = &sync.RWMutex{} + } + + f.queryPriorityMtx[userID].RLock() + queryPriorityChanged := !reflect.DeepEqual(f.queryPriority[userID], queryPriority) + f.queryPriorityMtx[userID].RUnlock() - freq.priority = util_query.GetPriority(reqParams, ts, &f.compiledQueryPriority, queryPriorityChanged) + if queryPriorityChanged { + f.queryPriorityMtx[userID].Lock() + f.queryPriority[userID] = queryPriority + f.compiledQueryPriority[userID] = util_query.GetCompileQueryPriority(queryPriority) + f.queryPriorityMtx[userID].Unlock() + } + + f.queryPriorityMtx[userID].RLock() + freq.priority = util_query.GetPriority(reqParams, ts, f.compiledQueryPriority[userID]) + f.queryPriorityMtx[userID].Unlock() + } } f.requests.put(freq) diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index a9cd226f594..4c40d88b663 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -10,8 +10,6 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/scheduler/queue" - "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -22,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" diff --git a/pkg/scheduler/queue/user_request_queue.go b/pkg/scheduler/queue/user_request_queue.go index 5325e94d9b1..828c9246fd4 100644 --- a/pkg/scheduler/queue/user_request_queue.go +++ b/pkg/scheduler/queue/user_request_queue.go @@ -27,22 +27,14 @@ func NewFIFORequestQueue(queue chan Request, userID string, queueLength *prometh func (f *FIFORequestQueue) enqueueRequest(r Request) { f.queue <- r if f.queueLength != nil { - f.queueLength.With(prometheus.Labels{ - "user": f.userID, - "priority": strconv.FormatInt(r.Priority(), 10), - "type": "fifo", - }).Inc() + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "fifo").Inc() } } func (f *FIFORequestQueue) dequeueRequest(_ int64, _ bool) Request { r := <-f.queue if f.queueLength != nil { - f.queueLength.With(prometheus.Labels{ - "user": f.userID, - "priority": strconv.FormatInt(r.Priority(), 10), - "type": "fifo", - }).Dec() + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "fifo").Dec() } return r } @@ -64,11 +56,7 @@ func NewPriorityRequestQueue(queue *util.PriorityQueue, userID string, queueLeng func (f *PriorityRequestQueue) enqueueRequest(r Request) { f.queue.Enqueue(r) if f.queueLength != nil { - f.queueLength.With(prometheus.Labels{ - "user": f.userID, - "priority": strconv.FormatInt(r.Priority(), 10), - "type": "priority", - }).Inc() + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "priority").Inc() } } @@ -78,11 +66,7 @@ func (f *PriorityRequestQueue) dequeueRequest(priority int64, matchPriority bool } r := f.queue.Dequeue() if f.queueLength != nil { - f.queueLength.With(prometheus.Labels{ - "user": f.userID, - "priority": strconv.FormatInt(r.Priority(), 10), - "type": "priority", - }).Dec() + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "priority").Dec() } return r } diff --git a/pkg/util/query/priority.go b/pkg/util/query/priority.go index 79e08c1ad8a..503736f2087 100644 --- a/pkg/util/query/priority.go +++ b/pkg/util/query/priority.go @@ -1,18 +1,32 @@ package query import ( - "math" "net/url" "regexp" - "strconv" "time" - "github.com/pkg/errors" - + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" ) -func GetPriority(requestParams url.Values, now time.Time, queryPriority *validation.QueryPriority, queryPriorityChanged bool) int64 { +func GetCompileQueryPriority(queryPriority validation.QueryPriority) validation.QueryPriority { + compiledQueryPriority := queryPriority + for i, priority := range compiledQueryPriority.Priorities { + for j, attribute := range priority.QueryAttributes { + compiledRegex, err := regexp.Compile(attribute.Regex) + if err != nil { + continue + } + + attribute.CompiledRegex = compiledRegex + compiledQueryPriority.Priorities[i].QueryAttributes[j] = attribute + } + } + + return compiledQueryPriority +} + +func GetPriority(requestParams url.Values, now time.Time, queryPriority validation.QueryPriority) int64 { queryParam := requestParams.Get("query") timeParam := requestParams.Get("time") startParam := requestParams.Get("start") @@ -22,19 +36,8 @@ func GetPriority(requestParams url.Values, now time.Time, queryPriority *validat return queryPriority.DefaultPriority } - for i, priority := range queryPriority.Priorities { - for j, attribute := range priority.QueryAttributes { - // If query priority config changed, re-populate the compiled regex - if queryPriorityChanged { - compiledRegex, err := regexp.Compile(attribute.Regex) - if err != nil { - continue - } - - attribute.CompiledRegex = compiledRegex - queryPriority.Priorities[i].QueryAttributes[j] = attribute - } - + for _, priority := range queryPriority.Priorities { + for _, attribute := range priority.QueryAttributes { if attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(queryParam) { continue } @@ -42,16 +45,16 @@ func GetPriority(requestParams url.Values, now time.Time, queryPriority *validat startTimeThreshold := now.Add(-1 * attribute.StartTime.Abs()).Truncate(time.Second).UTC() endTimeThreshold := now.Add(-1 * attribute.EndTime.Abs()).Add(1 * time.Second).Truncate(time.Second).UTC() - if startTime, err := parseTime(startParam); err == nil { - if endTime, err := parseTime(endParam); err == nil { - if isBetweenThresholds(startTime, endTime, startTimeThreshold, endTimeThreshold) { + if startTime, err := util.ParseTime(startParam); err == nil { + if endTime, err := util.ParseTime(endParam); err == nil { + if isBetweenThresholds(util.TimeFromMillis(startTime), util.TimeFromMillis(endTime), startTimeThreshold, endTimeThreshold) { return priority.Priority } } } - if instantTime, err := parseTime(timeParam); err == nil { - if isBetweenThresholds(instantTime, instantTime, startTimeThreshold, endTimeThreshold) { + if instantTime, err := util.ParseTime(timeParam); err == nil { + if isBetweenThresholds(util.TimeFromMillis(instantTime), util.TimeFromMillis(instantTime), startTimeThreshold, endTimeThreshold) { return priority.Priority } } @@ -67,21 +70,6 @@ func GetPriority(requestParams url.Values, now time.Time, queryPriority *validat return queryPriority.DefaultPriority } -func parseTime(s string) (time.Time, error) { - if s != "" { - if t, err := strconv.ParseFloat(s, 64); err == nil { - s, ns := math.Modf(t) - ns = math.Round(ns*1000) / 1000 - return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil - } - if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - return t, nil - } - } - - return time.Time{}, errors.Errorf("cannot parse %q to a valid timestamp", s) -} - func isBetweenThresholds(start, end, startThreshold, endThreshold time.Time) bool { return (start.Equal(startThreshold) || start.After(startThreshold)) && (end.Equal(endThreshold) || end.Before(endThreshold)) } diff --git a/pkg/util/query/priority_test.go b/pkg/util/query/priority_test.go index f1c9f4d1854..646aed17c17 100644 --- a/pkg/util/query/priority_test.go +++ b/pkg/util/query/priority_test.go @@ -2,6 +2,7 @@ package query import ( "net/url" + "regexp" "strconv" "testing" "time" @@ -18,9 +19,10 @@ func Test_GetPriorityShouldReturnDefaultPriorityIfNotEnabledOrEmptyQueryString(t Priority: 1, QueryAttributes: []validation.QueryAttribute{ { - Regex: ".*", - StartTime: 2 * time.Hour, - EndTime: 0 * time.Hour, + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + StartTime: 2 * time.Hour, + EndTime: 0 * time.Hour, }, }, }, @@ -32,13 +34,13 @@ func Test_GetPriorityShouldReturnDefaultPriorityIfNotEnabledOrEmptyQueryString(t assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) queryPriority.Enabled = true assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{""}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) } func Test_GetPriorityShouldConsiderRegex(t *testing.T) { @@ -48,9 +50,10 @@ func Test_GetPriorityShouldConsiderRegex(t *testing.T) { Priority: 1, QueryAttributes: []validation.QueryAttribute{ { - Regex: "sum", - StartTime: 2 * time.Hour, - EndTime: 0 * time.Hour, + Regex: "sum", + CompiledRegex: regexp.MustCompile("sum"), + StartTime: 2 * time.Hour, + EndTime: 0 * time.Hour, }, }, }, @@ -60,96 +63,62 @@ func Test_GetPriorityShouldConsiderRegex(t *testing.T) { Priorities: priorities, } - assert.Nil(t, queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) - assert.NotNil(t, queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) + }, now, queryPriority)) assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) queryPriority.Priorities[0].QueryAttributes[0].Regex = "(^sum$|c(.+)t)" + queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile("(^sum$|c(.+)t)") assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) queryPriority.Priorities[0].QueryAttributes[0].Regex = ".*" + queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(".*") assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) queryPriority.Priorities[0].QueryAttributes[0].Regex = ".+" + queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(".+") assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) queryPriority.Priorities[0].QueryAttributes[0].Regex = "" + queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile("") assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"count(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, false)) -} - -func Test_GetPriorityShouldNotRecompileRegexIfQueryPriorityChangedIsTrue(t *testing.T) { - now := time.Now() - priorities := []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - Regex: "sum", - StartTime: 2 * time.Hour, - EndTime: 0 * time.Hour, - }, - }, - }, - } - queryPriority := validation.QueryPriority{ - Enabled: true, - Priorities: priorities, - } - - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) - - queryPriority.Priorities[0].QueryAttributes[0].Regex = "count" - - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, false)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) } func Test_GetPriorityShouldConsiderStartAndEndTime(t *testing.T) { @@ -173,47 +142,47 @@ func Test_GetPriorityShouldConsiderStartAndEndTime(t *testing.T) { assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, &queryPriority, true)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-30*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-60*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "time": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(1), GetPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, "end": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-50*time.Minute).Unix(), 10)}, "end": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, "end": []string{strconv.FormatInt(now.Add(-10*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-60*time.Minute).Unix(), 10)}, "end": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) assert.Equal(t, int64(0), GetPriority(url.Values{ "query": []string{"sum(up)"}, "start": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, "end": []string{strconv.FormatInt(now.Add(-1*time.Minute).Unix(), 10)}, - }, now, &queryPriority, false)) + }, now, queryPriority)) }