Skip to content

Commit

Permalink
Batch adding series to query limiter to optimize locks (#5505)
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 authored Sep 14, 2023
1 parent 97effe9 commit f560115
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
* [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503
* [ENHANCEMENT] Querier: Batch adding series to query limiter to optimize locking. #5505
* [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
Expand Down
20 changes: 10 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,17 +1093,18 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
for _, m := range resp.Metric {
if err := queryLimiter.AddSeries(m.Labels); err != nil {
return nil, validation.LimitError(err.Error())
}
s = append(s, m.Labels)
m := cortexpb.FromLabelAdaptersToMetric(m.Labels)
fingerprint := m.Fingerprint()
mutex.Lock()
(*metrics)[fingerprint] = m
mutex.Unlock()
}

if err := queryLimiter.AddSeries(s...); err != nil {
return nil, validation.LimitError(err.Error())
}
return nil, nil
})

Expand All @@ -1130,19 +1131,18 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
} else if err != nil {
return nil, err
}

s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
for _, metric := range resp.Metric {
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)

if err := queryLimiter.AddSeries(metric.Labels); err != nil {
return nil, validation.LimitError(err.Error())
}

s = append(s, metric.Labels)
fingerprint := m.Fingerprint()
mutex.Lock()
(*metrics)[fingerprint] = m
mutex.Unlock()
}
if err := queryLimiter.AddSeries(s...); err != nil {
return nil, validation.LimitError(err.Error())
}
}

return nil, nil
Expand Down
19 changes: 10 additions & 9 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(chunkLimitErr.Error())
}

s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)+len(resp.Timeseries))
for _, series := range resp.Chunkseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
s = append(s, series.Labels)
}

for _, series := range resp.Timeseries {
s = append(s, series.Labels)
}

if limitErr := queryLimiter.AddSeries(s...); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}

if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil {
Expand All @@ -340,12 +347,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(dataBytesLimitErr.Error())
}

for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
}

result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,23 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
return ql
}

// AddSeries adds the input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error {
// AddSeriesBatch adds the batch of input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(series ...[]cortexpb.LabelAdapter) error {
// If the max series is unlimited just return without managing map
if ql.maxSeriesPerQuery == 0 {
return nil
}
fingerprint := client.FastFingerprint(seriesLabels)
fps := make([]model.Fingerprint, 0, len(series))
for _, s := range series {
fps = append(fps, client.FastFingerprint(s))
}

ql.uniqueSeriesMx.Lock()
defer ql.uniqueSeriesMx.Unlock()
for _, fp := range fps {
ql.uniqueSeries[fp] = struct{}{}
}

ql.uniqueSeries[fingerprint] = struct{}{}
if len(ql.uniqueSeries) > ql.maxSeriesPerQuery {
// Format error with max limit
return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery)
Expand Down
88 changes: 76 additions & 12 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package limiter

import (
"fmt"
"sync"
"testing"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -87,6 +88,37 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
require.Error(t, err)
}

func TestQueryLimiter_AddSeriesBatch_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
const (
metricName = "test_metric"
)

limiter := NewQueryLimiter(10, 0, 0, 0)
series := make([][]cortexpb.LabelAdapter, 0, 10)

for i := 0; i < 10; i++ {
s := []cortexpb.LabelAdapter{
{
Name: labels.MetricName,
Value: fmt.Sprintf("%v_%v", metricName, i),
},
}
series = append(series, s)
}
err := limiter.AddSeries(series...)
require.NoError(t, err)

series1 := []cortexpb.LabelAdapter{
{
Name: labels.MetricName,
Value: metricName + "_11",
},
}

err = limiter.AddSeries(series1)
require.Error(t, err)
}

func TestQueryLimiter_AddChunkBytes(t *testing.T) {
var limiter = NewQueryLimiter(0, 100, 0, 0)

Expand All @@ -106,23 +138,55 @@ func TestQueryLimiter_AddDataBytes(t *testing.T) {
}

func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
AddSeriesConcurrentBench(b, 1)
}

func BenchmarkQueryLimiter_AddSeriesBatch(b *testing.B) {
AddSeriesConcurrentBench(b, 128)
}

func AddSeriesConcurrentBench(b *testing.B, batchSize int) {
b.ResetTimer()
const (
metricName = "test_metric"
)
var series []labels.Labels
for i := 0; i < b.N; i++ {
series = append(series,
labels.FromMap(map[string]string{
labels.MetricName: metricName + "_1",
"series1": fmt.Sprint(i),
}))
}
b.ResetTimer()

limiter := NewQueryLimiter(b.N+1, 0, 0, 0)
for _, s := range series {
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s))
assert.NoError(b, err)

// Concurrent goroutines trying to add duplicated series
const numWorkers = 100
var wg sync.WaitGroup

worker := func(w int) {
defer wg.Done()
var series []labels.Labels
for i := 0; i < b.N; i++ {
series = append(series,
labels.FromMap(map[string]string{
labels.MetricName: metricName + "_1",
"series1": fmt.Sprint(i),
}))
}

for i := 0; i < len(series); i += batchSize {
s := make([][]cortexpb.LabelAdapter, 0, batchSize)
j := i + batchSize
if j > len(series) {
j = len(series)
}
for k := i; k < j; k++ {
s = append(s, cortexpb.FromLabelsToLabelAdapters(series[k]))
}

err := limiter.AddSeries(s...)
assert.NoError(b, err)
}
}

for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w)
}

wg.Wait()
}

0 comments on commit f560115

Please sign in to comment.