From f56011523275b9723ae43fb7eb9094ce17d3ac55 Mon Sep 17 00:00:00 2001 From: Harry John Date: Thu, 14 Sep 2023 10:29:55 -0700 Subject: [PATCH] Batch adding series to query limiter to optimize locks (#5505) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 20 +++--- pkg/distributor/query.go | 19 +++--- pkg/util/limiter/query_limiter.go | 13 ++-- pkg/util/limiter/query_limiter_test.go | 88 ++++++++++++++++++++++---- 5 files changed, 106 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2168368806..f01e33d39a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ * [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476 * [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481 * [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503 +* [ENHANCEMENT] Querier: Batch adding series to query limiter to optimize locking. #5505 * [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532 * [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517 * [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7ce4179e54..16eee1f444 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1093,17 +1093,18 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through if err := queryLimiter.AddDataBytes(resp.Size()); err != nil { return nil, validation.LimitError(err.Error()) } + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric)) for _, m := range resp.Metric { - if err := queryLimiter.AddSeries(m.Labels); err != nil { - return nil, validation.LimitError(err.Error()) - } + s = append(s, m.Labels) m := cortexpb.FromLabelAdaptersToMetric(m.Labels) fingerprint := m.Fingerprint() mutex.Lock() (*metrics)[fingerprint] = m mutex.Unlock() } - + if err := queryLimiter.AddSeries(s...); err != nil { + return nil, validation.LimitError(err.Error()) + } return nil, nil }) @@ -1130,19 +1131,18 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t } else if err != nil { return nil, err } - + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric)) for _, metric := range resp.Metric { m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels) - - if err := queryLimiter.AddSeries(metric.Labels); err != nil { - return nil, validation.LimitError(err.Error()) - } - + s = append(s, metric.Labels) fingerprint := m.Fingerprint() mutex.Lock() (*metrics)[fingerprint] = m mutex.Unlock() } + if err := queryLimiter.AddSeries(s...); err != nil { + return nil, validation.LimitError(err.Error()) + } } return nil, nil diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index f54bb737fa..21aa3419ac 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -326,10 +326,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return nil, validation.LimitError(chunkLimitErr.Error()) } + s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)+len(resp.Timeseries)) for _, series := range resp.Chunkseries { - if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { - return nil, validation.LimitError(limitErr.Error()) - } + s = append(s, series.Labels) + } + + for _, series := range resp.Timeseries { + s = append(s, series.Labels) + } + + if limitErr := queryLimiter.AddSeries(s...); limitErr != nil { + return nil, validation.LimitError(limitErr.Error()) } if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil { @@ -340,12 +347,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return nil, validation.LimitError(dataBytesLimitErr.Error()) } - for _, series := range resp.Timeseries { - if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { - return nil, validation.LimitError(limitErr.Error()) - } - } - result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) result.Timeseries = append(result.Timeseries, resp.Timeseries...) } diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 84031711e1..aa2261f7e6 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -65,18 +65,23 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { return ql } -// AddSeries adds the input series and returns an error if the limit is reached. -func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { +// AddSeriesBatch adds the batch of input series and returns an error if the limit is reached. +func (ql *QueryLimiter) AddSeries(series ...[]cortexpb.LabelAdapter) error { // If the max series is unlimited just return without managing map if ql.maxSeriesPerQuery == 0 { return nil } - fingerprint := client.FastFingerprint(seriesLabels) + fps := make([]model.Fingerprint, 0, len(series)) + for _, s := range series { + fps = append(fps, client.FastFingerprint(s)) + } ql.uniqueSeriesMx.Lock() defer ql.uniqueSeriesMx.Unlock() + for _, fp := range fps { + ql.uniqueSeries[fp] = struct{}{} + } - ql.uniqueSeries[fingerprint] = struct{}{} if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { // Format error with max limit return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery) diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 02b1fc9f73..699adccd32 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -2,6 +2,7 @@ package limiter import ( "fmt" + "sync" "testing" "github.com/prometheus/prometheus/model/labels" @@ -87,6 +88,37 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) require.Error(t, err) } +func TestQueryLimiter_AddSeriesBatch_ShouldReturnErrorOnLimitExceeded(t *testing.T) { + const ( + metricName = "test_metric" + ) + + limiter := NewQueryLimiter(10, 0, 0, 0) + series := make([][]cortexpb.LabelAdapter, 0, 10) + + for i := 0; i < 10; i++ { + s := []cortexpb.LabelAdapter{ + { + Name: labels.MetricName, + Value: fmt.Sprintf("%v_%v", metricName, i), + }, + } + series = append(series, s) + } + err := limiter.AddSeries(series...) + require.NoError(t, err) + + series1 := []cortexpb.LabelAdapter{ + { + Name: labels.MetricName, + Value: metricName + "_11", + }, + } + + err = limiter.AddSeries(series1) + require.Error(t, err) +} + func TestQueryLimiter_AddChunkBytes(t *testing.T) { var limiter = NewQueryLimiter(0, 100, 0, 0) @@ -106,23 +138,55 @@ func TestQueryLimiter_AddDataBytes(t *testing.T) { } func BenchmarkQueryLimiter_AddSeries(b *testing.B) { + AddSeriesConcurrentBench(b, 1) +} + +func BenchmarkQueryLimiter_AddSeriesBatch(b *testing.B) { + AddSeriesConcurrentBench(b, 128) +} + +func AddSeriesConcurrentBench(b *testing.B, batchSize int) { + b.ResetTimer() const ( metricName = "test_metric" ) - var series []labels.Labels - for i := 0; i < b.N; i++ { - series = append(series, - labels.FromMap(map[string]string{ - labels.MetricName: metricName + "_1", - "series1": fmt.Sprint(i), - })) - } - b.ResetTimer() limiter := NewQueryLimiter(b.N+1, 0, 0, 0) - for _, s := range series { - err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) - assert.NoError(b, err) + + // Concurrent goroutines trying to add duplicated series + const numWorkers = 100 + var wg sync.WaitGroup + + worker := func(w int) { + defer wg.Done() + var series []labels.Labels + for i := 0; i < b.N; i++ { + series = append(series, + labels.FromMap(map[string]string{ + labels.MetricName: metricName + "_1", + "series1": fmt.Sprint(i), + })) + } + + for i := 0; i < len(series); i += batchSize { + s := make([][]cortexpb.LabelAdapter, 0, batchSize) + j := i + batchSize + if j > len(series) { + j = len(series) + } + for k := i; k < j; k++ { + s = append(s, cortexpb.FromLabelsToLabelAdapters(series[k])) + } + + err := limiter.AddSeries(s...) + assert.NoError(b, err) + } + } + + for w := 1; w <= numWorkers; w++ { + wg.Add(1) + go worker(w) } + wg.Wait() }