Skip to content

Commit

Permalink
api: bump promql engine and fix fallout (#8000)
Browse files Browse the repository at this point in the history
* bump to new promql-engine version and fix fallout
* new promql-engine makes it possible to provide more options at runtime

Signed-off-by: Michael Hoffmann <[email protected]>
Co-authored-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann and Michael Hoffmann authored Dec 17, 2024
1 parent 683cf17 commit 1ca8292
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.
- [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options

### Changed

Expand Down
36 changes: 19 additions & 17 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/engine"

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
Expand Down Expand Up @@ -663,18 +664,23 @@ func runQuery(
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

engineOpts := promql.EngineOpts{
Logger: logger,
Reg: reg,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
engineOpts := engine.Opts{
EngineOpts: promql.EngineOpts{
Logger: logger,
Reg: reg,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
EnableNegativeOffset: true,
EnableAtModifier: true,
},
EnableNegativeOffset: true,
EnableAtModifier: true,
EnablePartialResponses: enableQueryPartialResponse,
EnableXFunctions: extendedFunctionsEnabled,
EnableAnalysis: true,
}

// An active query tracker will be added only if the user specifies a non-default path.
Expand All @@ -696,13 +702,9 @@ func runQuery(
})
}

engineFactory := apiv1.NewQueryEngineFactory(
engineOpts,
remoteEngineEndpoints,
extendedFunctionsEnabled,
)
engineFactory := apiv1.NewQueryEngineFactory(engineOpts, remoteEngineEndpoints)

lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta)
lookbackDeltaCreator := LookbackDeltaFactory(engineOpts.EngineOpts, dynamicLookbackDelta)

// Start query API + UI HTTP server.
{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.9.0
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68
github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2257,8 +2257,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 h1:cChM/FbpXeYmrSmXO1/MmmSlONviLVxWAWCB0/g4JrY=
github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf h1:JFh4PjC9yQidiFi4qMWbPddIgsLWPIsSEbXs75+tLxs=
github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=
Expand Down
21 changes: 14 additions & 7 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,21 @@ func (g *GRPCAPI) getQueryForEngine(ctx context.Context, request *querypb.QueryR
} else {
ts = time.Unix(request.TimeSeconds, 0)
}
opts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: request.EnablePartialResponse,
}
switch engineParam {
case querypb.EngineType_prometheus:
queryEngine := g.engineFactory.GetPrometheusEngine()
return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts)
return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts)
case querypb.EngineType_thanos:
queryEngine := g.engineFactory.GetThanosEngine()
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
if err != nil {
return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts)
return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts)
}

return queryEngine.NewInstantQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, ts)
return queryEngine.MakeInstantQueryFromPlan(ctx, queryable, opts, plan, ts)
default:
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
}
Expand Down Expand Up @@ -314,18 +317,22 @@ func (g *GRPCAPI) getRangeQueryForEngine(
if request.LookbackDeltaSeconds > 0 {
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
}
opts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: request.EnablePartialResponse,
}

switch engineParam {
case querypb.EngineType_prometheus:
queryEngine := g.engineFactory.GetPrometheusEngine()
return queryEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval)
return queryEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval)
case querypb.EngineType_thanos:
thanosEngine := g.engineFactory.GetThanosEngine()
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
if err != nil {
return thanosEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval)
return thanosEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval)
}
return thanosEngine.NewRangeQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, startTime, endTime, interval)
return thanosEngine.MakeRangeQueryFromPlan(ctx, queryable, opts, plan, startTime, endTime, interval)
default:
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/api/query/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
equery "github.com/thanos-io/promql-engine/query"

Expand All @@ -32,7 +33,7 @@ func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute)
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
engineFactory := &QueryEngineFactory{
thanosEngine: &engineStub{},
thanos: &engineStub{},
}
api := NewGRPCAPI(time.Now, nil, queryableCreator, engineFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)

Expand Down Expand Up @@ -97,7 +98,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {

for _, test := range tests {
engineFactory := &QueryEngineFactory{
prometheusEngine: test.engine,
prometheus: test.engine,
}
api := NewGRPCAPI(time.Now, nil, queryableCreator, engineFactory, querypb.EngineType_prometheus, lookbackDeltaFunc, 0)
t.Run("range_query", func(t *testing.T) {
Expand Down Expand Up @@ -153,19 +154,19 @@ type engineStub struct {
warns annotations.Annotations
}

func (e engineStub) NewInstantQuery(_ context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
func (e engineStub) MakeInstantQuery(_ context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return &queryStub{err: e.err, warns: e.warns}, nil
}

func (e engineStub) NewRangeQuery(_ context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
func (e engineStub) MakeRangeQuery(_ context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
return &queryStub{err: e.err, warns: e.warns}, nil
}

func (e engineStub) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
func (e engineStub) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
return &queryStub{err: e.err, warns: e.warns}, nil
}

func (e engineStub) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e engineStub) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
return &queryStub{err: e.err, warns: e.warns}, nil
}

Expand Down
Loading

0 comments on commit 1ca8292

Please sign in to comment.