diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 69ffb8ea32..20862295f2 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/api" + "github.com/thanos-io/promql-engine/engine" apiv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/api/query/querypb" @@ -663,18 +664,23 @@ func runQuery( prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) - engineOpts := promql.EngineOpts{ - Logger: logger, - Reg: reg, - // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. - MaxSamples: math.MaxInt32, - Timeout: queryTimeout, - LookbackDelta: lookbackDelta, - NoStepSubqueryIntervalFn: func(int64) int64 { - return defaultEvaluationInterval.Milliseconds() + engineOpts := engine.Opts{ + EngineOpts: promql.EngineOpts{ + Logger: logger, + Reg: reg, + // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. + MaxSamples: math.MaxInt32, + Timeout: queryTimeout, + LookbackDelta: lookbackDelta, + NoStepSubqueryIntervalFn: func(int64) int64 { + return defaultEvaluationInterval.Milliseconds() + }, + EnableNegativeOffset: true, + EnableAtModifier: true, }, - EnableNegativeOffset: true, - EnableAtModifier: true, + EnablePartialResponses: enableQueryPartialResponse, + EnableXFunctions: extendedFunctionsEnabled, + EnableAnalysis: true, } // An active query tracker will be added only if the user specifies a non-default path. @@ -696,13 +702,9 @@ func runQuery( }) } - engineFactory := apiv1.NewQueryEngineFactory( - engineOpts, - remoteEngineEndpoints, - extendedFunctionsEnabled, - ) + engineFactory := apiv1.NewQueryEngineFactory(engineOpts, remoteEngineEndpoints) - lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta) + lookbackDeltaCreator := LookbackDeltaFactory(engineOpts.EngineOpts, dynamicLookbackDelta) // Start query API + UI HTTP server. { diff --git a/go.mod b/go.mod index 9a725cb167..39e37512d7 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/sony/gobreaker v0.5.0 github.com/stretchr/testify v1.9.0 github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 - github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 + github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible // indirect github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a diff --git a/go.sum b/go.sum index 9933abda40..07de39a908 100644 --- a/go.sum +++ b/go.sum @@ -2257,8 +2257,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0= github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= -github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68 h1:cChM/FbpXeYmrSmXO1/MmmSlONviLVxWAWCB0/g4JrY= -github.com/thanos-io/promql-engine v0.0.0-20241203103240-2f49f80c7c68/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= +github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf h1:JFh4PjC9yQidiFi4qMWbPddIgsLWPIsSEbXs75+tLxs= +github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index 093457dd06..fc6e132798 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -162,18 +162,21 @@ func (g *GRPCAPI) getQueryForEngine(ctx context.Context, request *querypb.QueryR } else { ts = time.Unix(request.TimeSeconds, 0) } + opts := &engine.QueryOpts{ + LookbackDeltaParam: lookbackDelta, + EnablePartialResponses: request.EnablePartialResponse, + } switch engineParam { case querypb.EngineType_prometheus: queryEngine := g.engineFactory.GetPrometheusEngine() - return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts) + return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts) case querypb.EngineType_thanos: queryEngine := g.engineFactory.GetThanosEngine() plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson()) if err != nil { - return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts) + return queryEngine.MakeInstantQuery(ctx, queryable, opts, request.Query, ts) } - - return queryEngine.NewInstantQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, ts) + return queryEngine.MakeInstantQueryFromPlan(ctx, queryable, opts, plan, ts) default: return nil, status.Error(codes.InvalidArgument, "invalid engine parameter") } @@ -314,18 +317,22 @@ func (g *GRPCAPI) getRangeQueryForEngine( if request.LookbackDeltaSeconds > 0 { lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second } + opts := &engine.QueryOpts{ + LookbackDeltaParam: lookbackDelta, + EnablePartialResponses: request.EnablePartialResponse, + } switch engineParam { case querypb.EngineType_prometheus: queryEngine := g.engineFactory.GetPrometheusEngine() - return queryEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval) + return queryEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval) case querypb.EngineType_thanos: thanosEngine := g.engineFactory.GetThanosEngine() plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson()) if err != nil { - return thanosEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval) + return thanosEngine.MakeRangeQuery(ctx, queryable, opts, request.Query, startTime, endTime, interval) } - return thanosEngine.NewRangeQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, startTime, endTime, interval) + return thanosEngine.MakeRangeQueryFromPlan(ctx, queryable, opts, plan, startTime, endTime, interval) default: return nil, status.Error(codes.InvalidArgument, "invalid engine parameter") } diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 49b40b1597..0eaec2de72 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -94,61 +94,75 @@ const ( PromqlEngineThanos PromqlEngineType = "thanos" ) -type ThanosEngine interface { - promql.QueryEngine - NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) - NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) +type Engine interface { + MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, ts time.Time) (promql.Query, error) + MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) + MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) + MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) +} + +type prometheusEngineAdapter struct { + engine promql.QueryEngine +} + +func (a *prometheusEngineAdapter) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, ts time.Time) (promql.Query, error) { + return a.engine.NewInstantQuery(ctx, q, opts, qs, ts) +} + +func (a *prometheusEngineAdapter) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) { + return a.engine.NewRangeQuery(ctx, q, opts, qs, start, end, step) +} + +func (a *prometheusEngineAdapter) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) { + return a.engine.NewInstantQuery(ctx, q, opts, plan.String(), ts) +} + +func (a *prometheusEngineAdapter) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *engine.QueryOpts, plan logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) { + return a.engine.NewRangeQuery(ctx, q, opts, plan.String(), start, end, step) } type QueryEngineFactory struct { - engineOpts promql.EngineOpts + engineOpts engine.Opts remoteEngineEndpoints promqlapi.RemoteEndpoints createPrometheusEngine sync.Once - prometheusEngine promql.QueryEngine + prometheus Engine createThanosEngine sync.Once - thanosEngine ThanosEngine - enableXFunctions bool + thanos Engine } -func (f *QueryEngineFactory) GetPrometheusEngine() promql.QueryEngine { +func (f *QueryEngineFactory) GetPrometheusEngine() Engine { f.createPrometheusEngine.Do(func() { - if f.prometheusEngine != nil { + if f.prometheus != nil { return } - f.prometheusEngine = promql.NewEngine(f.engineOpts) + f.prometheus = &prometheusEngineAdapter{engine: promql.NewEngine(f.engineOpts.EngineOpts)} }) - - return f.prometheusEngine + return f.prometheus } -func (f *QueryEngineFactory) GetThanosEngine() ThanosEngine { +func (f *QueryEngineFactory) GetThanosEngine() Engine { f.createThanosEngine.Do(func() { - opts := engine.Opts{ - EngineOpts: f.engineOpts, - Engine: f.GetPrometheusEngine(), - EnableAnalysis: true, - EnableXFunctions: f.enableXFunctions, - } - if f.thanosEngine != nil { + if f.thanos != nil { return } if f.remoteEngineEndpoints == nil { - f.thanosEngine = engine.New(opts) + f.thanos = engine.New(f.engineOpts) } else { - f.thanosEngine = engine.NewDistributedEngine(opts, f.remoteEngineEndpoints) + f.thanos = engine.NewDistributedEngine(f.engineOpts, f.remoteEngineEndpoints) } }) - - return f.thanosEngine + return f.thanos } -func NewQueryEngineFactory(engineOpts promql.EngineOpts, remoteEngineEndpoints promqlapi.RemoteEndpoints, enableExtendedFunctions bool) *QueryEngineFactory { +func NewQueryEngineFactory( + engineOpts engine.Opts, + remoteEngineEndpoints promqlapi.RemoteEndpoints, +) *QueryEngineFactory { return &QueryEngineFactory{ engineOpts: engineOpts, remoteEngineEndpoints: remoteEngineEndpoints, - enableXFunctions: enableExtendedFunctions, } } @@ -339,8 +353,8 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio return enableDeduplication, nil } -func (qapi *QueryAPI) parseEngineParam(r *http.Request) (queryEngine promql.QueryEngine, e PromqlEngineType, _ *api.ApiError) { - var engine promql.QueryEngine +func (qapi *QueryAPI) parseEngineParam(r *http.Request) (queryEngine Engine, e PromqlEngineType, _ *api.ApiError) { + var engine Engine param := PromqlEngineType(r.FormValue("engine")) if param == "" { @@ -500,7 +514,7 @@ func processAnalysis(a *engine.AnalyzeOutputNode) queryTelemetry { } func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api.ApiError, func()) { - engine, engineParam, apiErr := qapi.parseEngineParam(r) + eng, engineParam, apiErr := qapi.parseEngineParam(r) if apiErr != nil { return nil, nil, apiErr, func() {} } @@ -574,7 +588,7 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api. ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) var seriesStats []storepb.SeriesStatsCounter - qry, err := engine.NewInstantQuery( + qry, err := eng.MakeInstantQuery( ctx, qapi.queryableCreate( enableDedup, @@ -586,7 +600,10 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api. shardInfo, query.NewAggregateStatsReporter(&seriesStats), ), - promql.NewPrometheusQueryOpts(false, lookbackDelta), + &engine.QueryOpts{ + LookbackDeltaParam: lookbackDelta, + EnablePartialResponses: enablePartialResponse, + }, r.FormValue("query"), ts, ) @@ -651,7 +668,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro return nil, nil, apiErr, func() {} } - engine, _, apiErr := qapi.parseEngineParam(r) + eng, _, apiErr := qapi.parseEngineParam(r) if apiErr != nil { return nil, nil, apiErr, func() {} } @@ -677,7 +694,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro ) if err := tracing.DoInSpanWithErr(ctx, "instant_query_create", func(ctx context.Context) error { var err error - qry, err = engine.NewInstantQuery( + qry, err = eng.MakeInstantQuery( ctx, qapi.queryableCreate( enableDedup, @@ -689,7 +706,10 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro shardInfo, query.NewAggregateStatsReporter(&seriesStats), ), - promql.NewPrometheusQueryOpts(false, lookbackDelta), + &engine.QueryOpts{ + LookbackDeltaParam: lookbackDelta, + EnablePartialResponses: enablePartialResponse, + }, queryStr, ts, ) @@ -745,7 +765,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro } func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error, *api.ApiError, func()) { - engine, engineParam, apiErr := qapi.parseEngineParam(r) + eng, engineParam, apiErr := qapi.parseEngineParam(r) if apiErr != nil { return nil, nil, apiErr, func() {} } @@ -845,7 +865,7 @@ func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error, ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) var seriesStats []storepb.SeriesStatsCounter - qry, err := engine.NewRangeQuery( + qry, err := eng.MakeRangeQuery( ctx, qapi.queryableCreate( enableDedup, @@ -857,7 +877,10 @@ func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error, shardInfo, query.NewAggregateStatsReporter(&seriesStats), ), - promql.NewPrometheusQueryOpts(false, lookbackDelta), + &engine.QueryOpts{ + LookbackDeltaParam: lookbackDelta, + EnablePartialResponses: enablePartialResponse, + }, r.FormValue("query"), start, end, @@ -949,7 +972,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, apiErr, func() {} } - engine, _, apiErr := qapi.parseEngineParam(r) + eng, _, apiErr := qapi.parseEngineParam(r) if apiErr != nil { return nil, nil, apiErr, func() {} } @@ -978,7 +1001,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap ) if err := tracing.DoInSpanWithErr(ctx, "range_query_create", func(ctx context.Context) error { var err error - qry, err = engine.NewRangeQuery( + qry, err = eng.MakeRangeQuery( ctx, qapi.queryableCreate( enableDedup, @@ -990,7 +1013,10 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap shardInfo, query.NewAggregateStatsReporter(&seriesStats), ), - promql.NewPrometheusQueryOpts(false, lookbackDelta), + &engine.QueryOpts{ + LookbackDeltaParam: lookbackDelta, + EnablePartialResponses: enablePartialResponse, + }, queryStr, start, end,