Skip to content

Commit

Permalink
Merge pull request #7261 from pedro-stanaka/feat/plan-serialize-optimize
Browse files Browse the repository at this point in the history
query: forward query plan in the remote query request
  • Loading branch information
fpetkovski authored Apr 9, 2024
2 parents f7853dd + 9ef4b5a commit 953ce26
Show file tree
Hide file tree
Showing 8 changed files with 586 additions and 112 deletions.
126 changes: 76 additions & 50 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/promql-engine/logicalplan"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -56,12 +58,6 @@ func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) {

func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_QueryServer) error {
ctx := server.Context()
var ts time.Time
if request.TimeSeconds == 0 {
ts = g.now()
} else {
ts = time.Unix(request.TimeSeconds, 0)
}

if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
Expand All @@ -75,11 +71,6 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

lookbackDelta := g.lookbackDeltaCreate(maxResolution * 1000)
if request.LookbackDeltaSeconds > 0 {
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return err
Expand All @@ -101,21 +92,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
query.NoopSeriesStatsReporter,
)

var engine promql.QueryEngine
engineParam := request.Engine
if engineParam == querypb.EngineType_default {
engineParam = g.defaultEngine
}

switch engineParam {
case querypb.EngineType_prometheus:
engine = g.engineFactory.GetPrometheusEngine()
case querypb.EngineType_thanos:
engine = g.engineFactory.GetThanosEngine()
default:
return status.Error(codes.InvalidArgument, "invalid engine parameter")
}
qry, err := engine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts)
qry, err := g.getQueryForEngine(ctx, request, queryable, maxResolution)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,6 +135,39 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
return nil
}

func (g *GRPCAPI) getQueryForEngine(ctx context.Context, request *querypb.QueryRequest, queryable storage.Queryable, maxResolution int64) (promql.Query, error) {
lookbackDelta := g.lookbackDeltaCreate(maxResolution * 1000)
if request.LookbackDeltaSeconds > 0 {
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
}
engineParam := request.Engine
if engineParam == querypb.EngineType_default {
engineParam = g.defaultEngine
}

var ts time.Time
if request.TimeSeconds == 0 {
ts = g.now()
} else {
ts = time.Unix(request.TimeSeconds, 0)
}
switch engineParam {
case querypb.EngineType_prometheus:
queryEngine := g.engineFactory.GetPrometheusEngine()
return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts)
case querypb.EngineType_thanos:
queryEngine := g.engineFactory.GetThanosEngine()
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
if err != nil {
return queryEngine.NewInstantQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, ts)
}

return queryEngine.NewInstantQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, ts)
default:
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
}
}

func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error {
ctx := srv.Context()
if request.TimeoutSeconds != 0 {
Expand All @@ -172,11 +182,6 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

lookbackDelta := g.lookbackDeltaCreate(maxResolution * 1000)
if request.LookbackDeltaSeconds > 0 {
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return err
Expand All @@ -198,25 +203,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
query.NoopSeriesStatsReporter,
)

startTime := time.Unix(request.StartTimeSeconds, 0)
endTime := time.Unix(request.EndTimeSeconds, 0)
interval := time.Duration(request.IntervalSeconds) * time.Second

var engine promql.QueryEngine
engineParam := request.Engine
if engineParam == querypb.EngineType_default {
engineParam = g.defaultEngine
}

switch engineParam {
case querypb.EngineType_prometheus:
engine = g.engineFactory.GetPrometheusEngine()
case querypb.EngineType_thanos:
engine = g.engineFactory.GetThanosEngine()
default:
return status.Error(codes.InvalidArgument, "invalid engine parameter")
}
qry, err := engine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval)
qry, err := g.getRangeQueryForEngine(ctx, request, queryable)
if err != nil {
return err
}
Expand Down Expand Up @@ -268,3 +255,42 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que

return nil
}

func (g *GRPCAPI) getRangeQueryForEngine(
ctx context.Context,
request *querypb.QueryRangeRequest,
queryable storage.Queryable,
) (promql.Query, error) {
startTime := time.Unix(request.StartTimeSeconds, 0)
endTime := time.Unix(request.EndTimeSeconds, 0)
interval := time.Duration(request.IntervalSeconds) * time.Second

engineParam := request.Engine
if engineParam == querypb.EngineType_default {
engineParam = g.defaultEngine
}

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}
lookbackDelta := g.lookbackDeltaCreate(maxResolution * 1000)
if request.LookbackDeltaSeconds > 0 {
lookbackDelta = time.Duration(request.LookbackDeltaSeconds) * time.Second
}

switch engineParam {
case querypb.EngineType_prometheus:
queryEngine := g.engineFactory.GetPrometheusEngine()
return queryEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval)
case querypb.EngineType_thanos:
thanosEngine := g.engineFactory.GetThanosEngine()
plan, err := logicalplan.Unmarshal(request.QueryPlan.GetJson())
if err != nil {
return thanosEngine.NewRangeQuery(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), request.Query, startTime, endTime, interval)
}
return thanosEngine.NewRangeQueryFromPlan(ctx, queryable, promql.NewPrometheusQueryOpts(false, lookbackDelta), plan, startTime, endTime, interval)
default:
return nil, status.Error(codes.InvalidArgument, "invalid engine parameter")
}
}
57 changes: 57 additions & 0 deletions pkg/api/query/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,64 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/logicalplan"
equery "github.com/thanos-io/promql-engine/query"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store"
)

func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, nil, 1*time.Minute, store.LazyRetrieval)
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute)
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
engineFactory := &QueryEngineFactory{
thanosEngine: &engineStub{},
}
api := NewGRPCAPI(time.Now, nil, queryableCreator, engineFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)

expr, err := parser.ParseExpr("metric")
testutil.Ok(t, err)
lplan := logicalplan.NewFromAST(expr, &equery.Options{}, logicalplan.PlanOptions{})
testutil.Ok(t, err)
// Create a mock query plan.
planBytes, err := logicalplan.Marshal(lplan.Root())
testutil.Ok(t, err)

rangeRequest := &querypb.QueryRangeRequest{
Query: "metric",
StartTimeSeconds: 0,
IntervalSeconds: 10,
EndTimeSeconds: 300,
QueryPlan: &querypb.QueryPlan{Encoding: &querypb.QueryPlan_Json{Json: planBytes}},
}

srv := newQueryRangeServer(context.Background())
err = api.QueryRange(rangeRequest, srv)
testutil.Ok(t, err)

// must also handle without query plan.
rangeRequest.QueryPlan = nil
err = api.QueryRange(rangeRequest, srv)
testutil.Ok(t, err)

instantRequest := &querypb.QueryRequest{
Query: "metric",
TimeoutSeconds: 60,
QueryPlan: &querypb.QueryPlan{Encoding: &querypb.QueryPlan_Json{Json: planBytes}},
}
instSrv := newQueryServer(context.Background())
err = api.Query(instantRequest, instSrv)
testutil.Ok(t, err)
}

func TestGRPCQueryAPIErrorHandling(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -108,6 +157,14 @@ func (e engineStub) NewRangeQuery(_ context.Context, q storage.Queryable, opts p
return &queryStub{err: e.err, warns: e.warns}, nil
}

func (e engineStub) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
return &queryStub{err: e.err, warns: e.warns}, nil
}

func (e engineStub) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
return &queryStub{err: e.err, warns: e.warns}, nil
}

type queryStub struct {
promql.Query
err error
Expand Down
25 changes: 25 additions & 0 deletions pkg/api/query/querypb/plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package querypb

import (
"github.com/pkg/errors"
"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/logicalplan"
)

func NewJSONEncodedPlan(plan api.RemoteQuery) (*QueryPlan, error) {
node, ok := plan.(logicalplan.Node)
if !ok {
return nil, errors.New("plan is not a logicalplan.Node")
}
bytes, err := logicalplan.Marshal(node)
if err != nil {
return nil, err
}

return &QueryPlan{
Encoding: &QueryPlan_Json{Json: bytes},
}, nil
}
Loading

0 comments on commit 953ce26

Please sign in to comment.