Skip to content

Commit

Permalink
QueryFrontend|Query: Create new arg to enable extended functions (#7028)
Browse files Browse the repository at this point in the history
* Adding new parameter for extended functions in querier

Signed-off-by: Pedro Tanaka <[email protected]>

* Adding new flag for QFE

Signed-off-by: Pedro Tanaka <[email protected]>

* improve argument passing in query side

Signed-off-by: Pedro Tanaka <[email protected]>

* Adding changelog

Signed-off-by: Pedro Tanaka <[email protected]>

* Adding e2e test for query

Signed-off-by: Pedro Tanaka <[email protected]>

* undoing uneeded changes

Signed-off-by: Pedro Tanaka <[email protected]>

* fixing docs

Signed-off-by: Pedro Tanaka <[email protected]>

* fixing e2e tests

Signed-off-by: Pedro Tanaka <[email protected]>

* Fixing backward compat test

Signed-off-by: Pedro Tanaka <[email protected]>

* changes from CR

Signed-off-by: Pedro Tanaka <[email protected]>

---------

Signed-off-by: Pedro Tanaka <[email protected]>
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka authored Jan 3, 2024
1 parent 7aad562 commit 08a14f3
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled.

- [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.
- [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions.

### Changed

Expand Down
5 changes: 4 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func registerQuery(app *extkingpin.App) {

defaultEngine := cmd.Flag("query.promql-engine", "Default PromQL engine to use.").Default(string(apiv1.PromqlEnginePrometheus)).
Enum(string(apiv1.PromqlEnginePrometheus), string(apiv1.PromqlEngineThanos))

extendedFunctionsEnabled := cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").Bool()
promqlQueryMode := cmd.Flag("query.mode", "PromQL query mode. One of: local, distributed.").
Hidden().
Default(string(queryModeLocal)).
Expand Down Expand Up @@ -342,6 +342,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySeriesQuantiles,
*defaultEngine,
storeRateLimits,
*extendedFunctionsEnabled,
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
Expand Down Expand Up @@ -421,6 +422,7 @@ func runQuery(
queryTelemetrySeriesQuantiles []float64,
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
extendedFunctionsEnabled bool,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
Expand Down Expand Up @@ -652,6 +654,7 @@ func runQuery(
engineFactory := apiv1.NewQueryEngineFactory(
engineOpts,
remoteEngineEndpoints,
extendedFunctionsEnabled,
)

lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta)
Expand Down
11 changes: 11 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -92,6 +94,9 @@ func registerQueryFrontend(app *extkingpin.App) {
cmd.Flag("query-range.max-retries-per-request", "Maximum number of retries for a single query range request; beyond this, the downstream error is returned.").
Default("5").IntVar(&cfg.QueryRangeConfig.MaxRetries)

cmd.Flag("query-frontend.enable-x-functions", "Enable experimental x- functions in query-frontend. --no-query-frontend.enable-x-functions for disabling.").
Default("false").BoolVar(&cfg.EnableXFunctions)

cmd.Flag("query-range.max-query-length", "Limit the query time range (end - start time) in the query-frontend, 0 disables it.").
Default("0").DurationVar((*time.Duration)(&cfg.QueryRangeConfig.Limits.MaxQueryLength))

Expand Down Expand Up @@ -285,6 +290,12 @@ func runQueryFrontend(
return errors.Wrap(err, "error validating the config")
}

if cfg.EnableXFunctions {
for fname, v := range parse.XFunctions {
parser.Functions[fname] = v
}
}

tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger)
if err != nil {
return errors.Wrap(err, "setup tripperwares")
Expand Down
5 changes: 5 additions & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ Flags:
--query-frontend.downstream-url="http://localhost:9090"
URL of downstream Prometheus Query compatible
API.
--query-frontend.enable-x-functions
Enable experimental x-
functions in query-frontend.
--no-query-frontend.enable-x-functions for
disabling.
--query-frontend.forward-header=<http-header-name> ...
List of headers forwarded by the query-frontend
to downstream queriers, default is empty
Expand Down
4 changes: 4 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ Flags:
--query.default-tenant-id="default-tenant"
Default tenant ID to use if tenant header is
not present
--query.enable-x-functions
Whether to enable extended rate functions
(xrate, xincrease and xdelta). Only has effect
when used with Thanos engine.
--query.lookback-delta=QUERY.LOOKBACK-DELTA
The maximum lookback duration for retrieving
metrics during expression evaluations.
Expand Down
9 changes: 4 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type QueryEngineFactory struct {

createThanosEngine sync.Once
thanosEngine v1.QueryEngine
enableXFunctions bool
}

func (f *QueryEngineFactory) GetPrometheusEngine() v1.QueryEngine {
Expand All @@ -118,7 +119,7 @@ func (f *QueryEngineFactory) GetThanosEngine() v1.QueryEngine {
return
}
if f.remoteEngineEndpoints == nil {
f.thanosEngine = engine.New(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true})
f.thanosEngine = engine.New(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true, EnableXFunctions: f.enableXFunctions})
} else {
f.thanosEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true}, f.remoteEngineEndpoints)
}
Expand All @@ -127,13 +128,11 @@ func (f *QueryEngineFactory) GetThanosEngine() v1.QueryEngine {
return f.thanosEngine
}

func NewQueryEngineFactory(
engineOpts promql.EngineOpts,
remoteEngineEndpoints promqlapi.RemoteEndpoints,
) *QueryEngineFactory {
func NewQueryEngineFactory(engineOpts promql.EngineOpts, remoteEngineEndpoints promqlapi.RemoteEndpoints, enableExtendedFunctions bool) *QueryEngineFactory {
return &QueryEngineFactory{
engineOpts: engineOpts,
remoteEngineEndpoints: remoteEngineEndpoints,
enableXFunctions: enableExtendedFunctions,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestQueryEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestQueryExplainEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestMetadataEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down
1 change: 1 addition & 0 deletions pkg/queryfrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type Config struct {
TenantHeader string
DefaultTenant string
TenantCertField string
EnableXFunctions bool
}

// QueryRangeConfig holds the config for query range tripperware.
Expand Down
20 changes: 17 additions & 3 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/thanos-io/objstore/exthttp"

"github.com/thanos-io/thanos/pkg/alert"
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/clientconfig"
"github.com/thanos-io/thanos/pkg/queryfrontend"
"github.com/thanos-io/thanos/pkg/receive"
Expand Down Expand Up @@ -253,8 +254,9 @@ type QuerierBuilder struct {
endpoints []string
strictEndpoints []string

engine string
queryMode string
engine apiv1.PromqlEngineType
queryMode string
enableXFunctions bool

replicaLabels []string
tracingConfig string
Expand Down Expand Up @@ -362,7 +364,7 @@ func (q *QuerierBuilder) WithDisablePartialResponses(disable bool) *QuerierBuild
return q
}

func (q *QuerierBuilder) WithEngine(engine string) *QuerierBuilder {
func (q *QuerierBuilder) WithEngine(engine apiv1.PromqlEngineType) *QuerierBuilder {
q.engine = engine
return q
}
Expand All @@ -372,6 +374,11 @@ func (q *QuerierBuilder) WithQueryMode(mode string) *QuerierBuilder {
return q
}

func (q *QuerierBuilder) WithEnableXFunctions() *QuerierBuilder {
q.enableXFunctions = true
return q
}

func (q *QuerierBuilder) WithEnvVars(envVars map[string]string) *QuerierBuilder {
q.envVars = envVars
return q
Expand Down Expand Up @@ -484,6 +491,13 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
for _, bucket := range q.telemetrySeriesQuantiles {
args = append(args, "--query.telemetry.request-series-seconds-quantiles="+strconv.FormatFloat(bucket, 'f', -1, 64))
}
if q.enableXFunctions {
args = append(args, "--query.enable-x-functions")
}
if q.engine != "" {
args = append(args, "--query.promql-engine="+string(q.engine))
}

return args, nil
}

Expand Down
43 changes: 43 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,49 @@ func TestQuery(t *testing.T) {
})
}

func TestQueryWithExtendedFunctions(t *testing.T) {
t.Parallel()

e, err := e2e.New(e2e.WithName("e2e-qry-xfunc"))
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// create prom + sidecar
prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "prom", e2ethanos.DefaultPromConfig("prom", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))

// create querier
q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).WithEngine("thanos").WithEnableXFunctions().Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

// send series to prom
samples := []seriesWithLabels{
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "1", "b", "1", "instance", "1")},
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "1", "b", "2", "instance", "1")},
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "2", "b", "1", "instance", "1")},
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "2", "b", "2", "instance", "1")},
}
testutil.Ok(t, remoteWriteSeriesWithLabels(ctx, prom, samples))

// query
queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return `xrate(my_fake_metric{a="1", b="1"}[1m])`
}, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"a": "1",
"b": "1",
"instance": "1",
"prometheus": "prom",
"replica": "0",
},
})
}

func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 08a14f3

Please sign in to comment.