Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <[email protected]>
  • Loading branch information
justinjung04 committed Nov 17, 2023
1 parent 013fab4 commit 96ff019
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 146 deletions.
35 changes: 25 additions & 10 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"reflect"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -81,10 +82,11 @@ type Frontend struct {
activeUsers *util.ActiveUsersCleanupService

// Used to check whether query priority config has changed
queryPriority validation.QueryPriority
queryPriority map[string]validation.QueryPriority
queryPriorityMtx map[string]*sync.RWMutex

// Populate and reuse compiled regex until query priority config changes
compiledQueryPriority validation.QueryPriority
compiledQueryPriority map[string]validation.QueryPriority

// Subservices manager.
subservices *services.Manager
Expand Down Expand Up @@ -217,16 +219,29 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
response: make(chan *httpgrpc.HTTPResponse, 1),
}

queryPriority := f.limits.QueryPriority(userID)
if reqParams != nil {
queryPriority := f.limits.QueryPriority(userID)

if reqParams != nil && queryPriority.Enabled {
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority, queryPriority)
if queryPriorityChanged {
f.queryPriority = queryPriority
f.compiledQueryPriority = queryPriority
}
if queryPriority.Enabled {
if _, exists := f.queryPriorityMtx[userID]; !exists {
f.queryPriorityMtx[userID] = &sync.RWMutex{}
}

f.queryPriorityMtx[userID].RLock()
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority[userID], queryPriority)
f.queryPriorityMtx[userID].RUnlock()

request.priority = util_query.GetPriority(reqParams, ts, &f.compiledQueryPriority, queryPriorityChanged)
if queryPriorityChanged {
f.queryPriorityMtx[userID].Lock()
f.queryPriority[userID] = queryPriority
f.compiledQueryPriority[userID] = util_query.GetCompileQueryPriority(queryPriority)
f.queryPriorityMtx[userID].Unlock()
}

f.queryPriorityMtx[userID].RLock()
request.priority = util_query.GetPriority(reqParams, ts, f.compiledQueryPriority[userID])
f.queryPriorityMtx[userID].Unlock()
}
}

if err := f.queueRequest(ctx, &request); err != nil {
Expand Down
34 changes: 24 additions & 10 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ type Frontend struct {
lastQueryID atomic.Uint64

// Used to check whether query priority config has changed
queryPriority validation.QueryPriority
queryPriority map[string]validation.QueryPriority
queryPriorityMtx map[string]*sync.RWMutex

// Populate and reuse compiled regex until query priority config changes
compiledQueryPriority validation.QueryPriority
compiledQueryPriority map[string]validation.QueryPriority

// frontend workers will read from this channel, and send request to scheduler.
requestsCh chan *frontendRequest
Expand Down Expand Up @@ -218,16 +219,29 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
}

queryPriority := f.limits.QueryPriority(userID)
if reqParams != nil {
queryPriority := f.limits.QueryPriority(userID)

if reqParams != nil && queryPriority.Enabled {
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority, queryPriority)
if queryPriorityChanged {
f.queryPriority = queryPriority
f.compiledQueryPriority = queryPriority
}
if queryPriority.Enabled {
if _, exists := f.queryPriorityMtx[userID]; !exists {
f.queryPriorityMtx[userID] = &sync.RWMutex{}
}

f.queryPriorityMtx[userID].RLock()
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority[userID], queryPriority)
f.queryPriorityMtx[userID].RUnlock()

freq.priority = util_query.GetPriority(reqParams, ts, &f.compiledQueryPriority, queryPriorityChanged)
if queryPriorityChanged {
f.queryPriorityMtx[userID].Lock()
f.queryPriority[userID] = queryPriority
f.compiledQueryPriority[userID] = util_query.GetCompileQueryPriority(queryPriority)
f.queryPriorityMtx[userID].Unlock()
}

f.queryPriorityMtx[userID].RLock()
freq.priority = util_query.GetPriority(reqParams, ts, f.compiledQueryPriority[userID])
f.queryPriorityMtx[userID].Unlock()
}
}

f.requests.put(freq)
Expand Down
3 changes: 1 addition & 2 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/scheduler/queue"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
Expand All @@ -22,6 +20,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down
24 changes: 4 additions & 20 deletions pkg/scheduler/queue/user_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,14 @@ func NewFIFORequestQueue(queue chan Request, userID string, queueLength *prometh
func (f *FIFORequestQueue) enqueueRequest(r Request) {
f.queue <- r
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "fifo",
}).Inc()
f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "fifo").Inc()
}
}

func (f *FIFORequestQueue) dequeueRequest(_ int64, _ bool) Request {
r := <-f.queue
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "fifo",
}).Dec()
f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "fifo").Dec()
}
return r
}
Expand All @@ -64,11 +56,7 @@ func NewPriorityRequestQueue(queue *util.PriorityQueue, userID string, queueLeng
func (f *PriorityRequestQueue) enqueueRequest(r Request) {
f.queue.Enqueue(r)
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "priority",
}).Inc()
f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "priority").Inc()
}
}

Expand All @@ -78,11 +66,7 @@ func (f *PriorityRequestQueue) dequeueRequest(priority int64, matchPriority bool
}
r := f.queue.Dequeue()
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "priority",
}).Dec()
f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "priority").Dec()
}
return r
}
Expand Down
64 changes: 26 additions & 38 deletions pkg/util/query/priority.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
package query

import (
"math"
"net/url"
"regexp"
"strconv"
"time"

"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func GetPriority(requestParams url.Values, now time.Time, queryPriority *validation.QueryPriority, queryPriorityChanged bool) int64 {
func GetCompileQueryPriority(queryPriority validation.QueryPriority) validation.QueryPriority {
compiledQueryPriority := queryPriority
for i, priority := range compiledQueryPriority.Priorities {
for j, attribute := range priority.QueryAttributes {
compiledRegex, err := regexp.Compile(attribute.Regex)
if err != nil {
continue
}

attribute.CompiledRegex = compiledRegex
compiledQueryPriority.Priorities[i].QueryAttributes[j] = attribute
}
}

return compiledQueryPriority
}

func GetPriority(requestParams url.Values, now time.Time, queryPriority validation.QueryPriority) int64 {
queryParam := requestParams.Get("query")
timeParam := requestParams.Get("time")
startParam := requestParams.Get("start")
Expand All @@ -22,36 +36,25 @@ func GetPriority(requestParams url.Values, now time.Time, queryPriority *validat
return queryPriority.DefaultPriority
}

for i, priority := range queryPriority.Priorities {
for j, attribute := range priority.QueryAttributes {
// If query priority config changed, re-populate the compiled regex
if queryPriorityChanged {
compiledRegex, err := regexp.Compile(attribute.Regex)
if err != nil {
continue
}

attribute.CompiledRegex = compiledRegex
queryPriority.Priorities[i].QueryAttributes[j] = attribute
}

for _, priority := range queryPriority.Priorities {
for _, attribute := range priority.QueryAttributes {
if attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(queryParam) {
continue
}

startTimeThreshold := now.Add(-1 * attribute.StartTime.Abs()).Truncate(time.Second).UTC()
endTimeThreshold := now.Add(-1 * attribute.EndTime.Abs()).Add(1 * time.Second).Truncate(time.Second).UTC()

if startTime, err := parseTime(startParam); err == nil {
if endTime, err := parseTime(endParam); err == nil {
if isBetweenThresholds(startTime, endTime, startTimeThreshold, endTimeThreshold) {
if startTime, err := util.ParseTime(startParam); err == nil {
if endTime, err := util.ParseTime(endParam); err == nil {
if isBetweenThresholds(util.TimeFromMillis(startTime), util.TimeFromMillis(endTime), startTimeThreshold, endTimeThreshold) {
return priority.Priority
}
}
}

if instantTime, err := parseTime(timeParam); err == nil {
if isBetweenThresholds(instantTime, instantTime, startTimeThreshold, endTimeThreshold) {
if instantTime, err := util.ParseTime(timeParam); err == nil {
if isBetweenThresholds(util.TimeFromMillis(instantTime), util.TimeFromMillis(instantTime), startTimeThreshold, endTimeThreshold) {
return priority.Priority
}
}
Expand All @@ -67,21 +70,6 @@ func GetPriority(requestParams url.Values, now time.Time, queryPriority *validat
return queryPriority.DefaultPriority
}

func parseTime(s string) (time.Time, error) {
if s != "" {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}
}

return time.Time{}, errors.Errorf("cannot parse %q to a valid timestamp", s)
}

func isBetweenThresholds(start, end, startThreshold, endThreshold time.Time) bool {
return (start.Equal(startThreshold) || start.After(startThreshold)) && (end.Equal(endThreshold) || end.Before(endThreshold))
}
Loading

0 comments on commit 96ff019

Please sign in to comment.