Skip to content

Commit

Permalink
pass down the query analysis hints
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 committed Oct 20, 2023
1 parent 49e748e commit 19e6fdb
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/querier/tripperware/queryrange/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queryrange

import (
"context"
"fmt"
"net/http"
"time"

Expand Down Expand Up @@ -96,8 +97,11 @@ func (s embedQuery) Do(ctx context.Context, r tripperware.Request) (tripperware.
Grouping: n.Grouping,
Without: n.Without,
}
ctx = context.WithValue(ctx, tripperware.AnalysisKey{}, analysis)
// Rewrite the inner expression to a vector selector with a special label and original inner query as value.
n.Expr = &parser.VectorSelector{LabelMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, tripperware.QueryLabel, newInnerExpr.String())}}
fmt.Printf("query at root: %s\n", n.String())
fmt.Printf("query at leaf: %s\n", newInnerExpr.String())
return s.evaluateWithQueryEngine(ctx, r.WithQuery(n.String()))
}
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/querier/tripperware/shard_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type shardBy struct {
analyzer querysharding.Analyzer
}

type AnalysisKey struct{}

func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
stats := querier_stats.FromContext(ctx)
Expand All @@ -52,19 +54,24 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
}

logger := util_log.WithContext(ctx, s.logger)
analysis, err := s.analyzer.Analyze(r.GetQuery())
if err != nil {
level.Warn(logger).Log("msg", "error analyzing query", "q", r.GetQuery(), "err", err)
}

stats.AddExtraFields(
"shard_by.is_shardable", analysis.IsShardable(),
"shard_by.num_shards", numShards,
"shard_by.sharding_labels", analysis.ShardingLabels(),
)
out := ctx.Value(AnalysisKey{})
analysis, ok := out.(querysharding.QueryAnalysis)
if !ok {
analysis, err = s.analyzer.Analyze(r.GetQuery())
if err != nil {
level.Warn(logger).Log("msg", "error analyzing query", "q", r.GetQuery(), "err", err)
}

stats.AddExtraFields(
"shard_by.is_shardable", analysis.IsShardable(),
"shard_by.num_shards", numShards,
"shard_by.sharding_labels", analysis.ShardingLabels(),
)

if err != nil || !analysis.IsShardable() {
return s.next.Do(ctx, r)
if err != nil || !analysis.IsShardable() {
return s.next.Do(ctx, r)
}
}

reqs := s.shardQuery(logger, numShards, r, analysis)
Expand Down

0 comments on commit 19e6fdb

Please sign in to comment.