Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disambiguate frontend types: rename frontend Request to MetricsQueryRequest #7737

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/frontend/querymiddleware/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type queryBlockerMiddleware struct {
next Handler
next MetricsQueryHandler
limits Limits
logger log.Logger
blockedQueriesCounter *prometheus.CounterVec
Expand All @@ -25,12 +25,12 @@ func newQueryBlockerMiddleware(
limits Limits,
logger log.Logger,
registerer prometheus.Registerer,
) Middleware {
) MetricsQueryMiddleware {
blockedQueriesCounter := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_rejected_queries_total",
Help: "Number of queries that were rejected by the cluster administrator.",
}, []string{"user", "reason"})
return MiddlewareFunc(func(next Handler) Handler {
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
return &queryBlockerMiddleware{
next: next,
limits: limits,
Expand All @@ -40,7 +40,7 @@ func newQueryBlockerMiddleware(
})
}

func (qb *queryBlockerMiddleware) Do(ctx context.Context, req Request) (Response, error) {
func (qb *queryBlockerMiddleware) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) {
tenants, err := tenant.TenantIDs(ctx)
if err != nil {
return qb.next.Do(ctx, req)
Expand All @@ -56,7 +56,7 @@ func (qb *queryBlockerMiddleware) Do(ctx context.Context, req Request) (Response
return qb.next.Do(ctx, req)
}

func (qb *queryBlockerMiddleware) isBlocked(tenant string, req Request) bool {
func (qb *queryBlockerMiddleware) isBlocked(tenant string, req MetricsQueryRequest) bool {
blocks := qb.limits.BlockedQueries(tenant)
if len(blocks) <= 0 {
return false
Expand Down
20 changes: 10 additions & 10 deletions pkg/frontend/querymiddleware/blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
func Test_queryBlocker_Do(t *testing.T) {
tests := []struct {
name string
request Request
request MetricsQueryRequest
shouldContinue bool
limits mockLimits
}{
{
name: "doesn't block queries due to empty limits",
limits: mockLimits{},
shouldContinue: true,
request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: "rate(metric_counter[5m])",
}),
},
Expand All @@ -39,7 +39,7 @@ func Test_queryBlocker_Do(t *testing.T) {
{Pattern: "rate(metric_counter[5m])", Regex: false},
},
},
request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: "rate(metric_counter[5m])",
}),
},
Expand All @@ -52,7 +52,7 @@ func Test_queryBlocker_Do(t *testing.T) {
},
shouldContinue: true,

request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: "rate(metric_counter[15m])",
}),
},
Expand All @@ -64,7 +64,7 @@ func Test_queryBlocker_Do(t *testing.T) {
rate(other_counter[5m])`, Regex: false},
},
},
request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: `rate(metric_counter[5m])/
rate(other_counter[5m])`,
}),
Expand All @@ -78,7 +78,7 @@ rate(other_counter[5m])`,
},
shouldContinue: true,

request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: `rate(metric_counter[15m])/
rate(other_counter[15m])`,
}),
Expand All @@ -90,7 +90,7 @@ rate(other_counter[15m])`,
{Pattern: ".*metric_counter.*", Regex: true},
},
},
request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: "rate(metric_counter[5m])",
}),
},
Expand All @@ -102,7 +102,7 @@ rate(other_counter[15m])`,
{Pattern: "(?s).*metric_counter.*", Regex: true},
},
},
request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: `rate(other_counter[15m])/
rate(metric_counter[15m])`,
}),
Expand All @@ -116,7 +116,7 @@ rate(other_counter[15m])`,
},
shouldContinue: true,

request: Request(&PrometheusRangeQueryRequest{
request: MetricsQueryRequest(&PrometheusRangeQueryRequest{
Query: "rate(metric_counter[5m])",
}),
},
Expand Down Expand Up @@ -150,7 +150,7 @@ type mockNextHandler struct {
shouldContinue bool
}

func (h *mockNextHandler) Do(_ context.Context, _ Request) (Response, error) {
func (h *mockNextHandler) Do(_ context.Context, _ MetricsQueryRequest) (Response, error) {
if !h.shouldContinue {
h.t.Error("The next middleware should not be called.")
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/frontend/querymiddleware/cardinality.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ const (
cacheErrorToleranceFraction = 0.1
)

// cardinalityEstimation is a Handler that caches estimates for a query's
// cardinalityEstimation is a MetricsQueryHandler that caches estimates for a query's
// cardinality based on similar queries seen previously.
type cardinalityEstimation struct {
cache cache.Cache
next Handler
next MetricsQueryHandler
logger log.Logger

estimationError prometheus.Histogram
}

func newCardinalityEstimationMiddleware(cache cache.Cache, logger log.Logger, registerer prometheus.Registerer) Middleware {
func newCardinalityEstimationMiddleware(cache cache.Cache, logger log.Logger, registerer prometheus.Registerer) MetricsQueryMiddleware {
estimationError := promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_frontend_cardinality_estimation_difference",
Help: "Difference between estimated and actual query cardinality",
Buckets: prometheus.ExponentialBuckets(100, 2, 10),
})
return MiddlewareFunc(func(next Handler) Handler {
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
return &cardinalityEstimation{
cache: cache,
next: next,
Expand All @@ -63,7 +63,7 @@ func newCardinalityEstimationMiddleware(cache cache.Cache, logger log.Logger, re

// Do injects a cardinality estimate into the query hints (if available) and
// caches the actual cardinality observed for this query.
func (c *cardinalityEstimation) Do(ctx context.Context, request Request) (Response, error) {
func (c *cardinalityEstimation) Do(ctx context.Context, request MetricsQueryRequest) (Response, error) {
spanLog := spanlogger.FromContext(ctx, c.logger)

tenants, err := tenant.TenantIDs(ctx)
Expand Down Expand Up @@ -162,7 +162,7 @@ func isCardinalitySimilar(actualCardinality, estimatedCardinality uint64) bool {
// with respect to both start time and range size. To avoid expiry of all
// estimates at the bucket boundary, an offset is added based on the hash of the
// query string.
func generateCardinalityEstimationCacheKey(userID string, r Request, bucketSize time.Duration) string {
func generateCardinalityEstimationCacheKey(userID string, r MetricsQueryRequest, bucketSize time.Duration) string {
hasher := fnv.New64a()
_, _ = hasher.Write([]byte(r.GetQuery()))

Expand Down
14 changes: 7 additions & 7 deletions pkg/frontend/querymiddleware/cardinality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Test_cardinalityEstimateBucket_QueryRequest_keyFormat(t *testing.T) {
tests := []struct {
name string
userID string
r Request
r MetricsQueryRequest
expected string
}{
{
Expand Down Expand Up @@ -133,7 +133,7 @@ func Test_cardinalityEstimation_Do(t *testing.T) {
Query: "up",
}
addSeriesHandler := func(estimate, actual uint64) HandlerFunc {
return func(ctx context.Context, request Request) (Response, error) {
return func(ctx context.Context, request MetricsQueryRequest) (Response, error) {
require.NotNil(t, request.GetHints())
request.GetHints().GetCardinalityEstimate()
require.Equal(t, request.GetHints().GetEstimatedSeriesCount(), estimate)
Expand All @@ -158,7 +158,7 @@ func Test_cardinalityEstimation_Do(t *testing.T) {
{
name: "no tenantID",
tenantID: "",
downstreamHandler: func(_ context.Context, _ Request) (Response, error) {
downstreamHandler: func(_ context.Context, _ MetricsQueryRequest) (Response, error) {
return &PrometheusResponse{}, nil
},
expectedLoads: 0,
Expand All @@ -168,7 +168,7 @@ func Test_cardinalityEstimation_Do(t *testing.T) {
{
name: "downstream error",
tenantID: "1",
downstreamHandler: func(_ context.Context, _ Request) (Response, error) {
downstreamHandler: func(_ context.Context, _ MetricsQueryRequest) (Response, error) {
return nil, errors.New("test error")
},
expectedLoads: 1,
Expand Down Expand Up @@ -205,7 +205,7 @@ func Test_cardinalityEstimation_Do(t *testing.T) {
{
name: "with empty cache",
tenantID: "1",
downstreamHandler: func(ctx context.Context, request Request) (Response, error) {
downstreamHandler: func(ctx context.Context, request MetricsQueryRequest) (Response, error) {
queryStats := stats.FromContext(ctx)
queryStats.AddFetchedSeries(numSeries)
return &PrometheusResponse{}, nil
Expand Down Expand Up @@ -251,8 +251,8 @@ func Test_cardinalityEstimateBucket_QueryRequest_requestEquality(t *testing.T) {
name string
tenantA string
tenantB string
requestA Request
requestB Request
requestA MetricsQueryRequest
requestB MetricsQueryRequest
expectedEqual bool
}{
{
Expand Down
34 changes: 17 additions & 17 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ const (
// Codec is used to encode/decode query requests and responses so they can be passed down to middlewares.
type Codec interface {
Merger
// DecodeRequest decodes a Request from an http request.
DecodeRequest(context.Context, *http.Request) (Request, error)
// DecodeMetricsQueryRequest decodes a MetricsQueryRequest from an http request.
DecodeMetricsQueryRequest(context.Context, *http.Request) (MetricsQueryRequest, error)
// DecodeLabelsQueryRequest decodes a LabelsQueryRequest from an http request.
DecodeLabelsQueryRequest(context.Context, *http.Request) (LabelsQueryRequest, error)
// DecodeResponse decodes a Response from an http response.
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeResponse(context.Context, *http.Response, Request, log.Logger) (Response, error)
// EncodeRequest encodes a Request into an http request.
EncodeRequest(context.Context, Request) (*http.Request, error)
DecodeResponse(context.Context, *http.Response, MetricsQueryRequest, log.Logger) (Response, error)
// EncodeMetricsQueryRequest encodes a MetricsQueryRequest into an http request.
EncodeMetricsQueryRequest(context.Context, MetricsQueryRequest) (*http.Request, error)
// EncodeLabelsQueryRequest encodes a LabelsQueryRequest into an http request.
EncodeLabelsQueryRequest(context.Context, LabelsQueryRequest) (*http.Request, error)
// EncodeResponse encodes a Response into an http response.
Expand All @@ -87,8 +87,8 @@ type Merger interface {
MergeResponse(...Response) (Response, error)
}

// Request represents an instant or query range request that can be process by middlewares.
type Request interface {
// MetricsQueryRequest represents an instant or query range request that can be process by middlewares.
type MetricsQueryRequest interface {
// GetId returns the ID of the request used to correlate downstream requests and responses.
GetId() int64
// GetStart returns the start timestamp of the request in milliseconds.
Expand All @@ -105,15 +105,15 @@ type Request interface {
// These hints can be used to optimize the query execution.
GetHints() *Hints
// WithID clones the current request with the provided ID.
WithID(id int64) Request
WithID(id int64) MetricsQueryRequest
// WithStartEnd clone the current request with different start and end timestamp.
WithStartEnd(startTime int64, endTime int64) Request
WithStartEnd(startTime int64, endTime int64) MetricsQueryRequest
// WithQuery clone the current request with a different query.
WithQuery(string) Request
WithQuery(string) MetricsQueryRequest
// WithTotalQueriesHint adds the number of total queries to this request's Hints.
WithTotalQueriesHint(int32) Request
WithTotalQueriesHint(int32) MetricsQueryRequest
// WithEstimatedSeriesCountHint WithEstimatedCardinalityHint adds a cardinality estimate to this request's Hints.
WithEstimatedSeriesCountHint(uint64) Request
WithEstimatedSeriesCountHint(uint64) MetricsQueryRequest
proto.Message
// AddSpanTags writes information about this request to an OpenTracing span
AddSpanTags(opentracing.Span)
Expand Down Expand Up @@ -246,7 +246,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
}, nil
}

func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request) (Request, error) {
func (c prometheusCodec) DecodeMetricsQueryRequest(_ context.Context, r *http.Request) (MetricsQueryRequest, error) {
switch {
case IsRangeQuery(r.URL.Path):
return c.decodeRangeQueryRequest(r)
Expand All @@ -257,7 +257,7 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request) (Requ
}
}

func (prometheusCodec) decodeRangeQueryRequest(r *http.Request) (Request, error) {
func (prometheusCodec) decodeRangeQueryRequest(r *http.Request) (MetricsQueryRequest, error) {
var result PrometheusRangeQueryRequest
var err error
reqValues, err := util.ParseRequestFormWithoutConsumingBody(r)
Expand All @@ -276,7 +276,7 @@ func (prometheusCodec) decodeRangeQueryRequest(r *http.Request) (Request, error)
return &result, nil
}

func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (Request, error) {
func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (MetricsQueryRequest, error) {
var result PrometheusInstantQueryRequest
var err error
reqValues, err := util.ParseRequestFormWithoutConsumingBody(r)
Expand Down Expand Up @@ -460,7 +460,7 @@ func decodeCacheDisabledOption(r *http.Request) bool {
return false
}

func (c prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Request, error) {
func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r MetricsQueryRequest) (*http.Request, error) {
var u *url.URL
switch r := r.(type) {
case *PrometheusRangeQueryRequest:
Expand Down Expand Up @@ -594,7 +594,7 @@ func encodeOptions(req *http.Request, o Options) {
}
}

func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ Request, logger log.Logger) (Response, error) {
func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ MetricsQueryRequest, logger log.Logger) (Response, error) {
switch r.StatusCode {
case http.StatusServiceUnavailable:
return nil, apierror.New(apierror.TypeUnavailable, string(mustReadResponseBody(r)))
Expand Down
16 changes: 8 additions & 8 deletions pkg/frontend/querymiddleware/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMetricsQueryRequest(t *testing.T) {

for i, tc := range []struct {
url string
expected Request
expected MetricsQueryRequest
expectedErr error
}{
{
Expand Down Expand Up @@ -97,14 +97,14 @@ func TestMetricsQueryRequest(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
r = r.WithContext(ctx)

req, err := codec.DecodeRequest(ctx, r)
req, err := codec.DecodeMetricsQueryRequest(ctx, r)
if err != nil || tc.expectedErr != nil {
require.EqualValues(t, tc.expectedErr, err)
return
}
require.EqualValues(t, tc.expected, req)

rdash, err := codec.EncodeRequest(context.Background(), req)
rdash, err := codec.EncodeMetricsQueryRequest(context.Background(), req)
require.NoError(t, err)
require.EqualValues(t, tc.url, rdash.RequestURI)
})
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestPrometheusCodec_EncodeRequest_AcceptHeader(t *testing.T) {
t.Run(queryResultPayloadFormat, func(t *testing.T) {
codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), queryResultPayloadFormat)
req := PrometheusInstantQueryRequest{}
encodedRequest, err := codec.EncodeRequest(context.Background(), &req)
encodedRequest, err := codec.EncodeMetricsQueryRequest(context.Background(), &req)
require.NoError(t, err)

switch queryResultPayloadFormat {
Expand All @@ -302,7 +302,7 @@ func TestPrometheusCodec_EncodeRequest_ReadConsistency(t *testing.T) {
t.Run(consistencyLevel, func(t *testing.T) {
codec := NewPrometheusCodec(prometheus.NewPedanticRegistry(), formatProtobuf)
ctx := api.ContextWithReadConsistency(context.Background(), consistencyLevel)
encodedRequest, err := codec.EncodeRequest(ctx, &PrometheusInstantQueryRequest{})
encodedRequest, err := codec.EncodeMetricsQueryRequest(ctx, &PrometheusInstantQueryRequest{})
require.NoError(t, err)
require.Equal(t, consistencyLevel, encodedRequest.Header.Get(api.ReadConsistencyHeader))
})
Expand Down Expand Up @@ -1133,14 +1133,14 @@ func TestPrometheusCodec_DecodeEncode(t *testing.T) {
expected.Header = make(http.Header)
}

// This header is set by EncodeRequest according to the codec's config, so we
// This header is set by EncodeMetricsQueryRequest according to the codec's config, so we
// should always expect it to be present on the re-encoded request.
expected.Header.Set("Accept", "application/json")

ctx := context.Background()
decoded, err := codec.DecodeRequest(ctx, expected)
decoded, err := codec.DecodeMetricsQueryRequest(ctx, expected)
require.NoError(t, err)
encoded, err := codec.EncodeRequest(ctx, decoded)
encoded, err := codec.EncodeMetricsQueryRequest(ctx, decoded)
require.NoError(t, err)

assert.Equal(t, expected.URL, encoded.URL)
Expand Down
Loading
Loading