Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embeded Query sharding in QFE #586

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
164 changes: 109 additions & 55 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build integration_query_fuzz
// +build integration_query_fuzz

package integration

import (
Expand All @@ -17,7 +14,9 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/querysharding"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
Expand Down Expand Up @@ -64,6 +63,7 @@ func TestVerticalShardingFuzz(t *testing.T) {
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, "")
// Enable vertical sharding for the second Cortex instance.
flags2 := mergeFlags(flags, map[string]string{
"-querier.embed": "true",
"-frontend.query-vertical-shard-size": "2",
"-blocks-storage.filesystem.dir": path2,
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
Expand All @@ -84,7 +84,7 @@ func TestVerticalShardingFuzz(t *testing.T) {
// Push some series to Cortex.
start := now.Add(-time.Minute * 10)
end := now.Add(-time.Minute * 1)
numSeries := 3
numSeries := 5
numSamples := 20
lbls := make([]labels.Labels, numSeries*2)
serieses := make([]prompb.TimeSeries, numSeries*2)
Expand Down Expand Up @@ -131,8 +131,8 @@ func TestVerticalShardingFuzz(t *testing.T) {

rnd := rand.New(rand.NewSource(now.Unix()))
opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithEnableOffset(false),
promqlsmith.WithEnableAtModifier(false),
}
ps := promqlsmith.New(rnd, lbls, opts...)

Expand All @@ -144,24 +144,27 @@ func TestVerticalShardingFuzz(t *testing.T) {
}

now = time.Now()
cases := make([]*testCase, 0, 200)
for i := 0; i < 100; i++ {
expr := ps.WalkInstantQuery()
query := expr.Pretty(0)
res1, err1 := c1.Query(query, now)
res2, err2 := c2.Query(query, now)
cases = append(cases, &testCase{
query: query,
res1: res1,
res2: res2,
err1: err1,
err2: err2,
instantQuery: true,
})
}
cases := make([]*testCase, 0, 1000)
analyzer := querysharding.NewQueryAnalyzer()

for i := 0; i < 100; i++ {
expr := ps.WalkRangeQuery()
for i := 0; i < 1000; i++ {
var expr parser.Expr
// Let's make sure we generate aggregation expression only as
// this is our main target to test.
for {
expr = ps.WalkRangeQuery()
if a, ok := expr.(*parser.AggregateExpr); ok && len(a.Grouping) == 0 && !a.Without {
if a.Op == parser.TOPK || a.Op == parser.BOTTOMK {
continue
}
qa, err := analyzer.Analyze(a.Expr.String())
require.NoError(t, err)
// Let's focus on outer query not shardable but inner query shardable case.
if qa.IsShardable() {
break
}
}
}
query := expr.Pretty(0)
res1, err1 := c1.QueryRange(query, start, end, scrapeInterval)
res2, err2 := c2.QueryRange(query, start, end, scrapeInterval)
Expand All @@ -186,7 +189,7 @@ func TestVerticalShardingFuzz(t *testing.T) {
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
failures++
}
} else if !sameModelValue(tc.res1, tc.res2) {
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
}
Expand All @@ -196,37 +199,88 @@ func TestVerticalShardingFuzz(t *testing.T) {
}
}

func sameModelValue(a model.Value, b model.Value) bool {
if a.Type() != b.Type() {
// comparer should be used to compare promql results between engines.
var comparer = cmp.Comparer(func(x, y model.Value) bool {
if x.Type() != y.Type() {
return false
}
// We allow a margin for comparing floats.
opts := []cmp.Option{cmpopts.EquateNaNs(), cmpopts.EquateApprox(0, 1e-6)}
switch a.Type() {
case model.ValMatrix:
s1, _ := a.(model.Matrix)
s2, _ := b.(model.Matrix)
// Sort to make sure we are not affected by series order.
sort.Sort(s1)
sort.Sort(s2)
return cmp.Equal(s1, s2, opts...)
case model.ValVector:
s1, _ := a.(model.Vector)
s2, _ := b.(model.Vector)
// Sort to make sure we are not affected by series order.
sort.Sort(s1)
sort.Sort(s2)
return cmp.Equal(s1, s2, opts...)
case model.ValScalar:
s1, _ := a.(*model.Scalar)
s2, _ := b.(*model.Scalar)
return cmp.Equal(s1, s2, opts...)
case model.ValString:
s1, _ := a.(*model.String)
s2, _ := b.(*model.String)
return cmp.Equal(s1, s2, opts...)
default:
// model.ValNone is impossible.
return false
compareFloats := func(l, r float64) bool {
const epsilon = 1e-6
return cmp.Equal(l, r, cmpopts.EquateNaNs(), cmpopts.EquateApprox(0, epsilon))
}
}
compareMetrics := func(l, r model.Metric) bool {
return l.Equal(r)
}

vx, xvec := x.(model.Vector)
vy, yvec := y.(model.Vector)

if xvec && yvec {
if len(vx) != len(vy) {
return false
}

// Sort vector before comparing.
sort.Sort(vx)
sort.Sort(vy)

for i := 0; i < len(vx); i++ {
if !compareMetrics(vx[i].Metric, vy[i].Metric) {
return false
}
if vx[i].Timestamp != vy[i].Timestamp {
return false
}
if !compareFloats(float64(vx[i].Value), float64(vy[i].Value)) {
return false
}
}
return true
}

mx, xmat := x.(model.Matrix)
my, ymat := y.(model.Matrix)

if xmat && ymat {
if len(mx) != len(my) {
return false
}
// Sort matrix before comparing.
sort.Sort(mx)
sort.Sort(my)
for i := 0; i < len(mx); i++ {
mxs := mx[i]
mys := my[i]

if !compareMetrics(mxs.Metric, mys.Metric) {
return false
}

xps := mxs.Values
yps := mys.Values

if len(xps) != len(yps) {
return false
}
for j := 0; j < len(xps); j++ {
if xps[j].Timestamp != yps[j].Timestamp {
return false
}
if !compareFloats(float64(xps[j].Value), float64(yps[j].Value)) {
return false
}
}
}
return true
}

sx, xscalar := x.(*model.Scalar)
sy, yscalar := y.(*model.Scalar)
if xscalar && yscalar {
if sx.Timestamp != sy.Timestamp {
return false
}
return compareFloats(float64(sx.Value), float64(sy.Value))
}
return false
})
19 changes: 19 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -456,6 +457,23 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)

var queryEngine v1.QueryEngine
opts := promql.EngineOpts{
Logger: util_log.Logger,
Reg: nil,
ActiveQueryTracker: nil,
MaxSamples: 5000000000,
Timeout: time.Minute,
LookbackDelta: 0,
EnablePerStepStats: t.Cfg.Querier.EnablePerStepStats,
EnableAtModifier: false,
EnableNegativeOffset: false,
NoStepSubqueryIntervalFn: func(int64) int64 {
return 0
},
}
queryEngine = promql.NewEngine(opts)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
util_log.Logger,
Expand All @@ -466,6 +484,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
queryAnalyzer,
prometheusCodec,
shardedPrometheusCodec,
queryEngine,
)
if err != nil {
return nil, err
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/lazyquery/lazyquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@ import (
"github.com/prometheus/prometheus/util/annotations"
)

// LazyQueryable wraps a storage.Queryable
type LazyQueryable struct {
q storage.Queryable
}

// Querier implements storage.Queryable
func (lq LazyQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
q, err := lq.q.Querier(mint, maxt)
if err != nil {
return nil, err
}

return NewLazyQuerier(q), nil
}

// NewLazyQueryable returns a lazily wrapped queryable
func NewLazyQueryable(q storage.Queryable) storage.Queryable {
return LazyQueryable{q}
}

// LazyQuerier is a lazy-loaded adapter for a storage.Querier
type LazyQuerier struct {
next storage.Querier
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
return &resp, nil
}

func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
func (instantQueryCodec) MergeResponse(ctx context.Context, deduplicate bool, req tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse")
sp.SetTag("response_count", len(responses))
defer sp.Finish()
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestMergeResponse(t *testing.T) {
if tc.cancelBeforeMerge {
cancelCtx()
}
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
resp, err := InstantQueryCodec.MergeResponse(ctx, true, tc.req, resps...)
assert.Equal(t, tc.expectedErr, err)
if err != nil {
cancelCtx()
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Codec interface {
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
type Merger interface {
// MergeResponse merges responses from multiple requests into a single Response
MergeResponse(context.Context, Request, ...Response) (Response, error)
MergeResponse(context.Context, bool, Request, ...Response) (Response, error)
}

// Response represents a query range response.
Expand Down
66 changes: 66 additions & 0 deletions pkg/querier/tripperware/queryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package tripperware

import (
"context"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
)

const (
// QueryLabel is a reserved label containing an embedded query
QueryLabel = "__cortex_queries__"
)

type RemoteQueryable struct {
Req Request
Next Handler

RespToSeriesSetFunc func(sortSeries bool, resp Response) storage.SeriesSet
}

func (q *RemoteQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
return &RemoteQuerier{
mint: mint,
maxt: maxt,
next: q.Next,
req: q.Req,
respToSeriesSetFunc: q.RespToSeriesSetFunc,
}, nil
}

type RemoteQuerier struct {
mint, maxt int64
next Handler
req Request

respToSeriesSetFunc func(sortSeries bool, resp Response) storage.SeriesSet
}

func (q *RemoteQuerier) Select(ctx context.Context, sortSeries bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
req := q.req
if len(matchers) == 1 && matchers[0].Type == labels.MatchEqual && matchers[0].Name == QueryLabel {
req = req.WithQuery(matchers[0].Value)
}
resp, err := q.next.Do(ctx, req)
if err != nil {
return storage.ErrSeriesSet(err)
}
return q.respToSeriesSetFunc(sortSeries, resp)
}

// LabelValues returns all potential values for a label name.
func (q *RemoteQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.Errorf("unimplemented")
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *RemoteQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.Errorf("unimplemented")
}

// Close releases the resources of the Querier.
func (q *RemoteQuerier) Close() error {
return nil
}
Loading
Loading