Skip to content

Commit

Permalink
fix astmapper seriesset merge issue
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Oct 21, 2023
1 parent 19e6fdb commit c2c7dec
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
return &resp, nil
}

func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
func (instantQueryCodec) MergeResponse(ctx context.Context, deduplicate bool, req tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse")
sp.SetTag("response_count", len(responses))
defer sp.Finish()
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestMergeResponse(t *testing.T) {
if tc.cancelBeforeMerge {
cancelCtx()
}
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
resp, err := InstantQueryCodec.MergeResponse(ctx, true, tc.req, resps...)
assert.Equal(t, tc.expectedErr, err)
if err != nil {
cancelCtx()
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Codec interface {
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
type Merger interface {
// MergeResponse merges responses from multiple requests into a single Response
MergeResponse(context.Context, Request, ...Response) (Response, error)
MergeResponse(context.Context, bool, Request, ...Response) (Response, error)
}

// Response represents a query range response.
Expand Down
19 changes: 15 additions & 4 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func NewEmptyPrometheusResponse() *PrometheusResponse {
}
}

func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
func (c prometheusCodec) MergeResponse(ctx context.Context, deduplicate bool, _ tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "QueryRangeResponse.MergeResponse")
sp.SetTag("response_count", len(responses))
defer sp.Finish()
Expand All @@ -145,9 +145,20 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques

// Merge the responses.
sort.Sort(byFirstTime(promResponses))
sampleStreams, err := matrixMerge(ctx, promResponses)
if err != nil {
return nil, err
var (
sampleStreams []tripperware.SampleStream
err error
)
if deduplicate {
sampleStreams, err = matrixMerge(ctx, promResponses)
if err != nil {
return nil, err
}
} else {
sampleStreams = make([]tripperware.SampleStream, 0)
for _, resp := range promResponses {
sampleStreams = append(sampleStreams, resp.Data.Result...)
}
}

response := PrometheusResponse{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const day = 24 * time.Hour
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
Embed bool `yaml:"embed"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
Expand All @@ -52,6 +53,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.Embed, "querier.embed", false, "embed querier")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
Expand Down Expand Up @@ -111,7 +113,9 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("results_cache", metrics), queryCacheMiddleware)
}

queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("embedding", metrics), EmbedQueryMiddleware(log, limits, queryAnalyzer, engine))
if cfg.Embed {
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("embedding", metrics), EmbedQueryMiddleware(log, limits, queryAnalyzer, engine))
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer))

return queryRangeMiddleware, c, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/tripperware/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
return nil, nil, err
}
if len(requests) == 0 {
response, err := s.merger.MergeResponse(ctx, r, responses...)
response, err := s.merger.MergeResponse(ctx, true, r, responses...)
// No downstream requests so no need to write back to the cache.
return response, nil, err
}
Expand Down Expand Up @@ -469,7 +469,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
if err != nil {
return nil, nil, err
}
merged, err := s.merger.MergeResponse(ctx, r, accumulator.Response, currentRes)
merged, err := s.merger.MergeResponse(ctx, true, r, accumulator.Response, currentRes)
if err != nil {
return nil, nil, err
}
Expand All @@ -481,7 +481,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
return nil, nil, err
}

response, err := s.merger.MergeResponse(ctx, r, responses...)
response, err := s.merger.MergeResponse(ctx, true, r, responses...)
return response, mergedExtents, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripper
resps = append(resps, reqResp.Response)
}

response, err := s.merger.MergeResponse(ctx, nil, resps...)
response, err := s.merger.MergeResponse(ctx, true, nil, resps...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/tripperware/shard_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {

logger := util_log.WithContext(ctx, s.logger)

deduplicate := true
out := ctx.Value(AnalysisKey{})
analysis, ok := out.(querysharding.QueryAnalysis)
if !ok {
Expand All @@ -72,6 +73,8 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
if err != nil || !analysis.IsShardable() {
return s.next.Do(ctx, r)
}
} else {
deduplicate = false
}

reqs := s.shardQuery(logger, numShards, r, analysis)
Expand All @@ -86,7 +89,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
resps = append(resps, reqResp.Response)
}

return s.merger.MergeResponse(ctx, r, resps...)
return s.merger.MergeResponse(ctx, deduplicate, r, resps...)
}

func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis querysharding.QueryAnalysis) []Request {
Expand Down

0 comments on commit c2c7dec

Please sign in to comment.