Skip to content

Commit

Permalink
api: bump promql engine and fix fallout
Browse files Browse the repository at this point in the history
* bump to new promql-engine version and fix fallout
* new promql-engine makes it possible to provide more options at runtime

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
Michael Hoffmann committed Dec 17, 2024
1 parent 683cf17 commit 9856716
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 68 deletions.
36 changes: 19 additions & 17 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/engine"

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
Expand Down Expand Up @@ -663,18 +664,23 @@ func runQuery(
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

engineOpts := promql.EngineOpts{
Logger: logger,
Reg: reg,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
engineOpts := engine.Opts{
EngineOpts: promql.EngineOpts{
Logger: logger,
Reg: reg,
// TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703.
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
EnableNegativeOffset: true,
EnableAtModifier: true,
},
EnableNegativeOffset: true,
EnableAtModifier: true,
EnablePartialResponses: enableQueryPartialResponse,
EnableXFunctions: extendedFunctionsEnabled,
EnableAnalysis: true,
}

// An active query tracker will be added only if the user specifies a non-default path.
Expand All @@ -696,13 +702,9 @@ func runQuery(
})
}

engineFactory := apiv1.NewQueryEngineFactory(
engineOpts,
remoteEngineEndpoints,
extendedFunctionsEnabled,
)
engineFactory := apiv1.NewQueryEngineFactory(engineOpts, remoteEngineEndpoints)

lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta)
lookbackDeltaCreator := LookbackDeltaFactory(engineOpts.EngineOpts, dynamicLookbackDelta)

// Start query API + UI HTTP server.
{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.9.0
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68
github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2257,8 +2257,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 h1:cChM/FbpXeYmrSmXO1/MmmSlONviLVxWAWCB0/g4JrY=
github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf h1:JFh4PjC9yQidiFi4qMWbPddIgsLWPIsSEbXs75+tLxs=
github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=
Expand Down
21 changes: 14 additions & 7 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,21 @@ func (g *GRPCAPI) getQueryForEngine(ctx context.Context, request *querypb.QueryR
} else {
ts = time.Unix(request.TimeSeconds, 0)
}
opts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: request.EnablePartialResponse,
}
switch engineParam {
case querypb.EngineType_prometheus:
queryEngine := g.engineFactory.GetPrometheusEngine()
return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts)
return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts)
case querypb.EngineType_thanos:
queryEngine := g.engineFactory.GetThanosEngine()
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
if err != nil {
return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts)
return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts)
}

return queryEngine.NewInstantQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, ts)
return queryEngine.MakeInstantQueryFromPlan(ctx, queryable, opts, plan, ts)
default:
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
}
Expand Down Expand Up @@ -314,18 +317,22 @@ func (g *GRPCAPI) getRangeQueryForEngine(
if request.LookbackDeltaSeconds > 0 {
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
}
opts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: request.EnablePartialResponse,
}

switch engineParam {
case querypb.EngineType_prometheus:
queryEngine := g.engineFactory.GetPrometheusEngine()
return queryEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval)
return queryEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval)
case querypb.EngineType_thanos:
thanosEngine := g.engineFactory.GetThanosEngine()
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
if err != nil {
return thanosEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval)
return thanosEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval)
}
return thanosEngine.NewRangeQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, startTime, endTime, interval)
return thanosEngine.MakeRangeQueryFromPlan(ctx, queryable, opts, plan, startTime, endTime, interval)
default:
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
}
Expand Down
108 changes: 67 additions & 41 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,61 +94,75 @@ const (
PromqlEngineThanos PromqlEngineType = "thanos"
)

type ThanosEngine interface {
promql.QueryEngine
NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error)
NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error)
type Engine interface {
MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, ts time.Time) (promql.Query, error)
MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error)
MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error)
MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error)
}

type prometheusEngineAdapter struct {
engine promql.QueryEngine
}

func (a *prometheusEngineAdapter) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return a.engine.NewInstantQuery(ctx, q, opts, qs, ts)
}

func (a *prometheusEngineAdapter) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
return a.engine.NewRangeQuery(ctx, q, opts, qs, start, end, step)
}

func (a *prometheusEngineAdapter) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
return a.engine.NewInstantQuery(ctx, q, opts, plan.String(), ts)
}

func (a *prometheusEngineAdapter) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
return a.engine.NewRangeQuery(ctx, q, opts, plan.String(), start, end, step)
}

type QueryEngineFactory struct {
engineOpts promql.EngineOpts
engineOpts engine.Opts
remoteEngineEndpoints promqlapi.RemoteEndpoints

createPrometheusEngine sync.Once
prometheusEngine promql.QueryEngine
prometheus Engine

createThanosEngine sync.Once
thanosEngine ThanosEngine
enableXFunctions bool
thanos Engine
}

func (f *QueryEngineFactory) GetPrometheusEngine() promql.QueryEngine {
func (f *QueryEngineFactory) GetPrometheusEngine() Engine {
f.createPrometheusEngine.Do(func() {
if f.prometheusEngine != nil {
if f.prometheus != nil {
return
}
f.prometheusEngine = promql.NewEngine(f.engineOpts)
f.prometheus = &prometheusEngineAdapter{engine: promql.NewEngine(f.engineOpts.EngineOpts)}
})

return f.prometheusEngine
return f.prometheus
}

func (f *QueryEngineFactory) GetThanosEngine() ThanosEngine {
func (f *QueryEngineFactory) GetThanosEngine() Engine {
f.createThanosEngine.Do(func() {
opts := engine.Opts{
EngineOpts: f.engineOpts,
Engine: f.GetPrometheusEngine(),
EnableAnalysis: true,
EnableXFunctions: f.enableXFunctions,
}
if f.thanosEngine != nil {
if f.thanos != nil {
return
}
if f.remoteEngineEndpoints == nil {
f.thanosEngine = engine.New(opts)
f.thanos = engine.New(f.engineOpts)
} else {
f.thanosEngine = engine.NewDistributedEngine(opts, f.remoteEngineEndpoints)
f.thanos = engine.NewDistributedEngine(f.engineOpts, f.remoteEngineEndpoints)
}
})

return f.thanosEngine
return f.thanos
}

func NewQueryEngineFactory(engineOpts promql.EngineOpts, remoteEngineEndpoints promqlapi.RemoteEndpoints, enableExtendedFunctions bool) *QueryEngineFactory {
func NewQueryEngineFactory(
engineOpts engine.Opts,
remoteEngineEndpoints promqlapi.RemoteEndpoints,
) *QueryEngineFactory {
return &QueryEngineFactory{
engineOpts: engineOpts,
remoteEngineEndpoints: remoteEngineEndpoints,
enableXFunctions: enableExtendedFunctions,
}
}

Expand Down Expand Up @@ -339,8 +353,8 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio
return enableDeduplication, nil
}

func (qapi *QueryAPI) parseEngineParam(r *http.Request) (queryEngine promql.QueryEngine, e PromqlEngineType, _ *api.ApiError) {
var engine promql.QueryEngine
func (qapi *QueryAPI) parseEngineParam(r *http.Request) (queryEngine Engine, e PromqlEngineType, _ *api.ApiError) {
var engine Engine

param := PromqlEngineType(r.FormValue("engine"))
if param == "" {
Expand Down Expand Up @@ -500,7 +514,7 @@ func processAnalysis(a *engine.AnalyzeOutputNode) queryTelemetry {
}

func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
engine, engineParam, apiErr := qapi.parseEngineParam(r)
eng, engineParam, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}
Expand Down Expand Up @@ -574,7 +588,7 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api.
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

var seriesStats []storepb.SeriesStatsCounter
qry, err := engine.NewInstantQuery(
qry, err := eng.MakeInstantQuery(
ctx,
qapi.queryableCreate(
enableDedup,
Expand All @@ -586,7 +600,10 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api.
shardInfo,
query.NewAggregateStatsReporter(&seriesStats),
),
promql.NewPrometheusQueryOpts(false, lookbackDelta),
&engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: enablePartialResponse,
},
r.FormValue("query"),
ts,
)
Expand Down Expand Up @@ -651,7 +668,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr, func() {}
}

engine, _, apiErr := qapi.parseEngineParam(r)
eng, _, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}
Expand All @@ -677,7 +694,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
)
if err := tracing.DoInSpanWithErr(ctx, "instant_query_create", func(ctx context.Context) error {
var err error
qry, err = engine.NewInstantQuery(
qry, err = eng.MakeInstantQuery(
ctx,
qapi.queryableCreate(
enableDedup,
Expand All @@ -689,7 +706,10 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
shardInfo,
query.NewAggregateStatsReporter(&seriesStats),
),
promql.NewPrometheusQueryOpts(false, lookbackDelta),
&engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: enablePartialResponse,
},
queryStr,
ts,
)
Expand Down Expand Up @@ -745,7 +765,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
}

func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
engine, engineParam, apiErr := qapi.parseEngineParam(r)
eng, engineParam, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}
Expand Down Expand Up @@ -845,7 +865,7 @@ func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error,
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

var seriesStats []storepb.SeriesStatsCounter
qry, err := engine.NewRangeQuery(
qry, err := eng.MakeRangeQuery(
ctx,
qapi.queryableCreate(
enableDedup,
Expand All @@ -857,7 +877,10 @@ func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error,
shardInfo,
query.NewAggregateStatsReporter(&seriesStats),
),
promql.NewPrometheusQueryOpts(false, lookbackDelta),
&engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: enablePartialResponse,
},
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -949,7 +972,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr, func() {}
}

engine, _, apiErr := qapi.parseEngineParam(r)
eng, _, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}
Expand Down Expand Up @@ -978,7 +1001,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
)
if err := tracing.DoInSpanWithErr(ctx, "range_query_create", func(ctx context.Context) error {
var err error
qry, err = engine.NewRangeQuery(
qry, err = eng.MakeRangeQuery(
ctx,
qapi.queryableCreate(
enableDedup,
Expand All @@ -990,7 +1013,10 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
shardInfo,
query.NewAggregateStatsReporter(&seriesStats),
),
promql.NewPrometheusQueryOpts(false, lookbackDelta),
&engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
EnablePartialResponses: enablePartialResponse,
},
queryStr,
start,
end,
Expand Down

0 comments on commit 9856716

Please sign in to comment.