Skip to content

Commit

Permalink
Pass timestamp as param
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <[email protected]>
  • Loading branch information
justinjung04 committed Oct 18, 2023
1 parent 1a03fa5 commit fc1260d
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 51 deletions.
5 changes: 3 additions & 2 deletions pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"io"
"net/http"
"net/url"
"time"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
)

// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, url.Values, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
RoundTripGRPC(context.Context, url.Values, time.Time, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}

func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
Expand All @@ -40,7 +41,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
return nil, err
}

resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, req)
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), r.Form, time.Now(), req)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) {
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
Expand All @@ -198,7 +198,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
request := request{
request: req,
originalCtx: ctx,
isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)),
isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)),

// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (f *Frontend) stopping(_ error) error {
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values, timestamp time.Time, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, requestParams url.Values,
request: req,
userID: userID,
statsEnabled: stats.IsEnabled(ctx),
isHighPriority: util_query.IsHighPriority(requestParams, f.limits.HighPriorityQueries(userID)),
isHighPriority: util_query.IsHighPriority(requestParams, timestamp, f.limits.HighPriorityQueries(userID)),

cancel: cancel,

Expand Down
12 changes: 6 additions & 6 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestFrontendBasicWorkflow(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 0)

resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestFrontendRetryRequest(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 3)

res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), res.Code)
}
Expand All @@ -170,7 +170,7 @@ func TestFrontendRetryEnqueue(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 0)

_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, &httpgrpc.HTTPRequest{})
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
}

Expand All @@ -179,7 +179,7 @@ func TestFrontendEnqueueFailure(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
}, 0)

_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, &httpgrpc.HTTPRequest{})
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
}
Expand All @@ -190,7 +190,7 @@ func TestFrontendCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{})
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)

Expand Down Expand Up @@ -239,7 +239,7 @@ func TestFrontendFailedCancellation(t *testing.T) {
}()

// send request
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, &httpgrpc.HTTPRequest{})
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), url.Values{}, time.Now(), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, resp)

Expand Down
11 changes: 5 additions & 6 deletions pkg/util/query/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.HighPriorityQuery) bool {
func IsHighPriority(requestParams url.Values, timestamp time.Time, highPriorityQueries []validation.HighPriorityQuery) bool {
queryParam := requestParams.Get("query")
timeParam := requestParams.Get("time")
startParam := requestParams.Get("start")
Expand All @@ -22,12 +22,11 @@ func IsHighPriority(requestParams url.Values, highPriorityQueries []validation.H
continue
}

now := time.Now()
startTimeThreshold := now.Add(-1 * highPriorityQuery.StartTime).UnixMilli()
endTimeThreshold := now.Add(-1 * highPriorityQuery.EndTime).UnixMilli()
startTimeThreshold := timestamp.Add(-1 * highPriorityQuery.StartTime.Abs()).UnixMilli()
endTimeThreshold := timestamp.Add(-1 * highPriorityQuery.EndTime.Abs()).UnixMilli()

if time, err := strconv.ParseInt(timeParam, 10, 64); err == nil {
if isBetweenThresholds(time, time, startTimeThreshold, endTimeThreshold) {
if instantTime, err := strconv.ParseInt(timeParam, 10, 64); err == nil {
if isBetweenThresholds(instantTime, instantTime, startTimeThreshold, endTimeThreshold) {
return true
}
}
Expand Down
65 changes: 32 additions & 33 deletions pkg/util/query/priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

func Test_IsHighPriority_DefaultValues(t *testing.T) {
now := time.Now()
config := []validation.HighPriorityQuery{
{}, // By default, it should match all queries happened at "now"
}

assert.True(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
assert.False(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.Add(-1*time.Second).UnixMilli(), 10)},
}, config))
}

func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
now := time.Now()
config := []validation.HighPriorityQuery{
Expand All @@ -38,11 +22,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))
assert.False(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))

config = []validation.HighPriorityQuery{
{
Expand All @@ -53,11 +37,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))
assert.True(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))

config = []validation.HighPriorityQuery{
{
Expand All @@ -71,11 +55,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))
assert.True(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))

config = []validation.HighPriorityQuery{
{
Expand All @@ -89,11 +73,11 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
assert.False(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))
assert.False(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))

config = []validation.HighPriorityQuery{
{
Expand All @@ -104,11 +88,26 @@ func Test_IsHighPriority_ShouldMatchRegex(t *testing.T) {
assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))
assert.True(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, now, config))

config = []validation.HighPriorityQuery{
{
Regex: "",
},
}

assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, now, config))
assert.True(t, IsHighPriority(url.Values{
"query": []string{"count(up)"},
"time": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))
}

func Test_IsHighPriority_ShouldBeBetweenStartAndEndTime(t *testing.T) {
Expand All @@ -123,32 +122,32 @@ func Test_IsHighPriority_ShouldBeBetweenStartAndEndTime(t *testing.T) {
assert.False(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)},
}, config))
}, now, config))
assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)},
}, config))
}, now, config))
assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)},
}, config))
}, now, config))
assert.False(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"time": []string{strconv.FormatInt(now.Add(-1*time.Minute).UnixMilli(), 10)},
}, config))
}, now, config))
assert.False(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"start": []string{strconv.FormatInt(now.Add(-2*time.Hour).UnixMilli(), 10)},
"end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)},
}, config))
}, now, config))
assert.True(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)},
"end": []string{strconv.FormatInt(now.Add(-30*time.Minute).UnixMilli(), 10)},
}, config))
}, now, config))
assert.False(t, IsHighPriority(url.Values{
"query": []string{"sum(up)"},
"start": []string{strconv.FormatInt(now.Add(-1*time.Hour).UnixMilli(), 10)},
"end": []string{strconv.FormatInt(now.UnixMilli(), 10)},
}, config))
}, now, config))
}

0 comments on commit fc1260d

Please sign in to comment.