diff --git a/CHANGELOG.md b/CHANGELOG.md index 8559e53924..6181aba0cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,11 +47,12 @@ * [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386 * [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358 * [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay. #6277 +* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406 +* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 * [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326 * [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271 * [BUGFIX] Ingester: Fix regression on usage of cortex_ingester_queried_chunks. #6398 -* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406 * [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409 ## 1.18.1 2024-10-14 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d5c06fc205..f48bfeddbc 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5770,7 +5770,9 @@ limits: # would not enforce any limits. [max_series: | default = ] -# LabelSet which the limit should be applied. +# LabelSet which the limit should be applied. If no labels are provided, it +# becomes the default partition which matches any series that doesn't match any +# other explicitly defined label sets.' [label_set: | default = []] ``` diff --git a/docs/guides/tracing.md b/docs/guides/tracing.md index 9aa5b27a72..485c22b428 100644 --- a/docs/guides/tracing.md +++ b/docs/guides/tracing.md @@ -50,7 +50,7 @@ In order to send traces, you will need to set up an OpenTelemetry Collector. The multiple destinations such as [AWS X-Ray](https://aws-otel.github.io/docs/getting-started/x-ray), [Google Cloud](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/googlecloudexporter), [DataDog](https://docs.datadoghq.com/tracing/trace_collection/open_standards/otel_collector_datadog_exporter/) and -[others(https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter). OpenTelemetry Collector +[others](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter). OpenTelemetry Collector provides a [helm chart](https://github.com/open-telemetry/opentelemetry-helm-charts/tree/main/charts/opentelemetry-collector/examples/deployment-otlp-traces) to set up the environment. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 27d094c441..1d3fc2c5fc 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -233,8 +233,9 @@ type Ingester struct { TSDBState TSDBState // Rate of pushed samples. Only used by V2-ingester to limit global samples push rate. - ingestionRate *util_math.EwmaRate - inflightPushRequests atomic.Int64 + ingestionRate *util_math.EwmaRate + inflightPushRequests atomic.Int64 + maxInflightPushRequests util_math.MaxTracker inflightQueryRequests atomic.Int64 maxInflightQueryRequests util_math.MaxTracker @@ -710,7 +711,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, - &i.inflightPushRequests, + &i.maxInflightPushRequests, &i.maxInflightQueryRequests, cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled) i.validateMetrics = validation.NewValidateMetrics(registerer) @@ -792,7 +793,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe false, i.getInstanceLimits, nil, - &i.inflightPushRequests, + &i.maxInflightPushRequests, &i.maxInflightQueryRequests, cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled, ) @@ -918,8 +919,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { metadataPurgeTicker := time.NewTicker(metadataPurgePeriod) defer metadataPurgeTicker.Stop() - maxInflightRequestResetTicker := time.NewTicker(maxInflightRequestResetPeriod) - defer maxInflightRequestResetTicker.Stop() + maxTrackerResetTicker := time.NewTicker(maxInflightRequestResetPeriod) + defer maxTrackerResetTicker.Stop() for { select { @@ -937,8 +938,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error { case <-activeSeriesTickerChan: i.updateActiveSeries(ctx) - case <-maxInflightRequestResetTicker.C: + case <-maxTrackerResetTicker.C: i.maxInflightQueryRequests.Tick() + i.maxInflightPushRequests.Tick() case <-userTSDBConfigTicker.C: i.updateUserTSDBConfigs() case <-ctx.Done(): @@ -1068,6 +1070,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // We will report *this* request in the error too. inflight := i.inflightPushRequests.Inc() + i.maxInflightPushRequests.Track(inflight) defer i.inflightPushRequests.Dec() gl := i.getInstanceLimits() diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f64368877f..ad84d486f1 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -110,8 +110,8 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e serieses = append(serieses, &storage.SeriesEntry{ Lset: ls, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return batch.NewChunkMergeIterator(chunks, math.MinInt64, math.MaxInt64) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return batch.NewChunkMergeIterator(it, chunks, math.MinInt64, math.MaxInt64) }, }) } @@ -366,22 +366,117 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) - // Should remove metrics when the limits is removed + // Add default partition -> no label set configured working as a fallback when a series + // doesn't match any existing label set limit. + emptyLabels := labels.EmptyLabels() + defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels, + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 2, + }, + } + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits) + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + + lbls = []string{labels.MetricName, "test_default"} + for i := 0; i < 2; i++ { + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API)) + require.NoError(t, err) + } + + // Max series limit for default partition is 2 so 1 more series will be throttled. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API)) + httpResp, ok = httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "returned error is not an httpgrpc response") + assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) + require.ErrorContains(t, err, emptyLabels.String()) + + ing.updateActiveSeries(ctx) + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. + # TYPE cortex_ingester_limits_per_labelset gauge + cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. + # TYPE cortex_ingester_usage_per_labelset gauge + cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7 + cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) + + // Add a new label set limit. + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, + validation.LimitsPerLabelSet{LabelSet: labels.FromMap(map[string]string{ + "series": "0", + }), + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 3, + }, + }, + ) + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + ing.updateActiveSeries(ctx) + // Default partition usage reduced from 2 to 1 as one series in default partition + // now counted into the new partition. + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. + # TYPE cortex_ingester_limits_per_labelset gauge + cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. + # TYPE cortex_ingester_usage_per_labelset gauge + cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7 + cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 1 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 1 + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) + + // Should remove metrics when the limits is removed, keep default partition limit limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2] + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits) b, err = json.Marshal(limits) require.NoError(t, err) require.NoError(t, limits.UnmarshalJSON(b)) tenantLimits.setLimits(userID, &limits) ing.updateActiveSeries(ctx) + // Default partition usage increased from 2 to 10 as some existing partitions got removed. require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. # TYPE cortex_ingester_limits_per_labelset gauge cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. # TYPE cortex_ingester_usage_per_labelset gauge cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) // Should persist between restarts @@ -396,10 +491,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { # TYPE cortex_ingester_limits_per_labelset gauge cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. # TYPE cortex_ingester_usage_per_labelset gauge cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck @@ -420,6 +517,13 @@ func TestPushRace(t *testing.T) { MaxSeries: 10e10, }, }, + { + // Default partition. + LabelSet: labels.EmptyLabels(), + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 10e10, + }, + }, } dir := t.TempDir() @@ -451,6 +555,10 @@ func TestPushRace(t *testing.T) { defer wg.Done() _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API)) require.NoError(t, err) + + // Go to default partition. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API)) + require.NoError(t, err) }() } } @@ -472,13 +580,13 @@ func TestPushRace(t *testing.T) { err = ir.Series(p.At(), &builder, nil) require.NoError(t, err) lbls := builder.Labels() - require.Equal(t, "foo", lbls.Get(labels.MetricName)) + require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar") require.Equal(t, "1", lbls.Get("userId")) require.NotEmpty(t, lbls.Get("k")) builder.Reset() } - require.Equal(t, numberOfSeries, total) - require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries()) + require.Equal(t, 2*numberOfSeries, total) + require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries()) } func TestIngesterUserLimitExceeded(t *testing.T) { diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 9b572a8409..0aaf6b312a 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int // AssertMaxSeriesPerLabelSet limit has not been reached compared to the current // number of metrics with metadata in input and returns an error if so. -func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error { - m := l.limitsPerLabelSets(userID, metric) - for _, limit := range m { +func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error { + limits := l.limits.LimitsPerLabelSet(userID) + matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric) + for _, limit := range matchedLimits { maxSeriesFunc := func(string) int { return limit.Limits.MaxSeries } local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc) - if u, err := f(limit); err != nil { + if u, err := f(limits, limit); err != nil { return err } else if u >= local { return errMaxSeriesPerLabelSetLimitExceeded{ @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet { m := l.limits.LimitsPerLabelSet(userID) - - // returning early to not have any overhead - if len(m) == 0 { - return nil - } - - r := make([]validation.LimitsPerLabelSet, 0, len(m)) -outer: - for _, lbls := range m { - for _, lbl := range lbls.LabelSet { - // We did not find some of the labels on the set - if v := metric.Get(lbl.Name); v != lbl.Value { - continue outer - } - } - r = append(r, lbls) - } - return r + return validation.LimitsPerLabelSetsForSeries(m, metric) } func (l *Limiter) maxSeriesPerMetric(userID string) int { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index e4759191c1..944af00293 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") - actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) { + actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { return testData.series, nil }) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index b1c7edc50d..645f995868 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -3,7 +3,6 @@ package ingester import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" @@ -53,10 +52,12 @@ type ingesterMetrics struct { maxUsersGauge prometheus.GaugeFunc maxSeriesGauge prometheus.GaugeFunc maxIngestionRate prometheus.GaugeFunc - ingestionRate prometheus.GaugeFunc maxInflightPushRequests prometheus.GaugeFunc - inflightRequests prometheus.GaugeFunc - inflightQueryRequests prometheus.GaugeFunc + + // Current Usage + ingestionRate prometheus.GaugeFunc + inflightRequests prometheus.GaugeFunc + inflightQueryRequests prometheus.GaugeFunc // Posting Cache Metrics expandedPostingsCacheMetrics *tsdb.ExpandedPostingsCacheMetrics @@ -67,7 +68,7 @@ func newIngesterMetrics(r prometheus.Registerer, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, - inflightPushRequests *atomic.Int64, + inflightPushRequests *util_math.MaxTracker, maxInflightQueryRequests *util_math.MaxTracker, postingsCacheEnabled bool, ) *ingesterMetrics { diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 56214c9a50..031dec9482 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -8,7 +8,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" - "go.uber.org/atomic" util_math "github.com/cortexproject/cortex/pkg/util/math" ) @@ -16,10 +15,10 @@ import ( func TestIngesterMetrics(t *testing.T) { mainReg := prometheus.NewPedanticRegistry() ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) - inflightPushRequests := &atomic.Int64{} + inflightPushRequests := util_math.MaxTracker{} maxInflightQueryRequests := util_math.MaxTracker{} maxInflightQueryRequests.Track(98) - inflightPushRequests.Store(14) + inflightPushRequests.Track(14) m := newIngesterMetrics(mainReg, false, @@ -33,7 +32,7 @@ func TestIngesterMetrics(t *testing.T) { } }, ingestionRate, - inflightPushRequests, + &inflightPushRequests, &maxInflightQueryRequests, false) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 7b0dec6920..8518bbb5b8 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" "github.com/segmentio/fasthash/fnv1a" @@ -114,21 +115,31 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter { } func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error { - return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) { - s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards] + return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { + s := m.shards[util.HashFP(model.Fingerprint(limit.Hash))%numMetricCounterShards] s.RLock() - if r, ok := s.valuesCounter[set.Hash]; ok { + if r, ok := s.valuesCounter[limit.Hash]; ok { defer s.RUnlock() return r.count, nil } s.RUnlock() // We still dont keep track of this label value so we need to backfill - return m.backFillLimit(ctx, u, set, s) + return m.backFillLimit(ctx, u, false, allLimits, limit, s) }) } -func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) { +func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, forceBackfill bool, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) { + s.Lock() + // If not force backfill, use existing counter value. + if !forceBackfill { + if r, ok := s.valuesCounter[limit.Hash]; ok { + s.Unlock() + return r.count, nil + } + } + + defer s.Unlock() ir, err := u.db.Head().Index() if err != nil { return 0, err @@ -136,37 +147,72 @@ func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit defer ir.Close() - s.Lock() - defer s.Unlock() - if r, ok := s.valuesCounter[limit.Hash]; !ok { - postings := make([]index.Postings, 0, len(limit.LabelSet)) - for _, lbl := range limit.LabelSet { - p, err := ir.Postings(ctx, lbl.Name, lbl.Value) - if err != nil { - return 0, err - } - postings = append(postings, p) - } + numSeries := u.db.Head().NumSeries() + totalCount, err := getCardinalityForLimitsPerLabelSet(ctx, numSeries, ir, allLimits, limit) + if err != nil { + return 0, err + } - p := index.Intersect(postings...) + s.valuesCounter[limit.Hash] = &labelSetCounterEntry{ + count: totalCount, + labels: limit.LabelSet, + } + return totalCount, nil +} - totalCount := 0 - for p.Next() { - totalCount++ +func getCardinalityForLimitsPerLabelSet(ctx context.Context, numSeries uint64, ir tsdb.IndexReader, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { + // Easy path with explicit labels. + if limit.LabelSet.Len() > 0 { + p, err := getPostingForLabels(ctx, ir, limit.LabelSet) + if err != nil { + return 0, err } + return getPostingCardinality(p) + } - if p.Err() != nil { - return 0, p.Err() + // Default partition needs to get cardinality of all series that doesn't belong to any existing partitions. + postings := make([]index.Postings, 0, len(allLimits)-1) + for _, l := range allLimits { + if l.Hash == limit.Hash { + continue } + p, err := getPostingForLabels(ctx, ir, l.LabelSet) + if err != nil { + return 0, err + } + postings = append(postings, p) + } + mergedCardinality, err := getPostingCardinality(index.Merge(ctx, postings...)) + if err != nil { + return 0, err + } + + return int(numSeries) - mergedCardinality, nil +} - s.valuesCounter[limit.Hash] = &labelSetCounterEntry{ - count: totalCount, - labels: limit.LabelSet, +func getPostingForLabels(ctx context.Context, ir tsdb.IndexReader, lbls labels.Labels) (index.Postings, error) { + postings := make([]index.Postings, 0, len(lbls)) + for _, lbl := range lbls { + p, err := ir.Postings(ctx, lbl.Name, lbl.Value) + if err != nil { + return nil, err } - return totalCount, nil - } else { - return r.count, nil + postings = append(postings, p) } + + return index.Intersect(postings...), nil +} + +func getPostingCardinality(p index.Postings) (int, error) { + totalCount := 0 + for p.Next() { + totalCount++ + } + + if p.Err() != nil { + return 0, p.Err() + } + return totalCount, nil } func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) { @@ -200,10 +246,12 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error { currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{} - for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) { + limits := m.limiter.limits.LimitsPerLabelSet(u.userID) + for _, l := range limits { currentLbsLimitHash[l.Hash] = l } + nonDefaultPartitionChanged := false for i := 0; i < numMetricCounterShards; i++ { s := m.shards[i] s.RLock() @@ -213,19 +261,45 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics if _, ok := currentLbsLimitHash[h]; !ok { metrics.usagePerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls) metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls) + if entry.labels.Len() > 0 { + nonDefaultPartitionChanged = true + } continue } - metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count)) - metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries)) - delete(currentLbsLimitHash, h) + // Delay exposing default partition metrics from current label limits as if + // another label set is added or removed then we need to backfill default partition again. + if entry.labels.Len() > 0 { + metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count)) + metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries)) + delete(currentLbsLimitHash, h) + } } s.RUnlock() } + // Check if we need to backfill default partition. We don't need to backfill when any condition meet: + // 1. Default partition doesn't exist. + // 2. No new partition added and no old partition removed. + if !nonDefaultPartitionChanged { + for _, l := range currentLbsLimitHash { + if l.LabelSet.Len() > 0 { + nonDefaultPartitionChanged = true + break + } + } + } + // Backfill all limits that are not being tracked yet for _, l := range currentLbsLimitHash { s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards] - count, err := m.backFillLimit(ctx, u, l, s) + force := false + // Force backfill to make sure we update the counter for the default partition + // when other limits got added or removed. If no partition is changed then we + // can use the value in the counter. + if l.LabelSet.Len() == 0 && nonDefaultPartitionChanged { + force = true + } + count, err := m.backFillLimit(ctx, u, force, limits, l, s) if err != nil { return err } diff --git a/pkg/ingester/user_state_test.go b/pkg/ingester/user_state_test.go index 071aa5733f..910f8f1a7e 100644 --- a/pkg/ingester/user_state_test.go +++ b/pkg/ingester/user_state_test.go @@ -1,8 +1,15 @@ package ingester import ( + "context" + "errors" "testing" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -92,3 +99,271 @@ func TestMetricCounter(t *testing.T) { }) } } + +func TestGetCardinalityForLimitsPerLabelSet(t *testing.T) { + ctx := context.Background() + testErr := errors.New("err") + for _, tc := range []struct { + name string + ir tsdb.IndexReader + limits []validation.LimitsPerLabelSet + idx int + cnt int + numSeries uint64 + err error + }{ + { + name: "single label", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar")}, + }, + cnt: 5, + }, + { + name: "multiple limits", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("job", "test")}, + {LabelSet: labels.FromStrings("foo", "bar")}, + }, + idx: 1, + cnt: 5, + }, + { + name: "2 labels intersect", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar", "job", "prometheus")}, + }, + cnt: 3, + }, + { + name: "error", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.ErrPostings(testErr)}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar", "job", "prometheus")}, + }, + err: testErr, + }, + { + name: "no match and no default partition", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "baz")}, + }, + idx: 0, + cnt: 0, + }, + { + name: "no match", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "baz")}, + }, + cnt: 0, + }, + { + // Total cardinality 12. Cardinality of existing label set limits 8. + // Default partition cardinality 4. + name: "default partition", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{3, 4, 5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {Hash: 1, LabelSet: labels.FromStrings("foo", "bar")}, + {Hash: 2, LabelSet: labels.FromStrings("job", "test")}, + {}, // Default partition. + }, + idx: 2, + cnt: 4, + numSeries: 12, + }, + { + name: "default partition with error getting postings", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.ErrPostings(testErr)}, + }}, + limits: []validation.LimitsPerLabelSet{ + {Hash: 1, LabelSet: labels.FromStrings("foo", "bar")}, + {Hash: 2, LabelSet: labels.FromStrings("job", "test")}, + {}, // Default partition. + }, + idx: 2, + err: testErr, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cnt, err := getCardinalityForLimitsPerLabelSet(ctx, tc.numSeries, tc.ir, tc.limits, tc.limits[tc.idx]) + if err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tc.cnt, cnt) + } + }) + } +} + +func TestGetPostingForLabels(t *testing.T) { + ctx := context.Background() + testErr := errors.New("err") + for _, tc := range []struct { + name string + ir tsdb.IndexReader + lbls labels.Labels + postings []storage.SeriesRef + err error + }{ + { + name: "single label", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar"), + postings: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "intersect", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar", "job", "prometheus"), + postings: []storage.SeriesRef{1, 3, 5}, + }, + { + name: "error", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.ErrPostings(testErr)}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar", "job", "prometheus"), + postings: []storage.SeriesRef{1, 3, 5}, + err: testErr, + }, + } { + t.Run(tc.name, func(t *testing.T) { + p, err := getPostingForLabels(ctx, tc.ir, tc.lbls) + if err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + series := make([]storage.SeriesRef, 0) + for p.Next() { + series = append(series, p.At()) + } + if tc.err != nil { + require.EqualError(t, p.Err(), tc.err.Error()) + } else { + require.Equal(t, tc.postings, series) + } + } + }) + } +} + +func TestGetPostingCardinality(t *testing.T) { + testErr := errors.New("err") + for _, tc := range []struct { + name string + p index.Postings + cardinality int + err error + }{ + { + name: "empty", + p: index.EmptyPostings(), + }, + { + name: "err", + p: index.ErrPostings(testErr), + err: testErr, + }, + { + name: "cardinality", + p: index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + cardinality: 10, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cnt, err := getPostingCardinality(tc.p) + if tc.err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tc.cardinality, cnt) + } + }) + } +} + +type mockIndexReader struct { + postings map[string]map[string]index.Postings +} + +func (ir *mockIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { + if entries, ok := ir.postings[name]; ok { + if postings, ok2 := entries[values[0]]; ok2 { + return postings, nil + } + } + return index.EmptyPostings(), nil +} + +func (ir *mockIndexReader) Symbols() index.StringIter { return nil } + +func (ir *mockIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings { + return nil +} + +func (ir *mockIndexReader) SortedPostings(index.Postings) index.Postings { return nil } + +func (ir *mockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + return nil +} + +func (ir *mockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { + return nil +} + +func (ir *mockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error) { + return "", nil +} + +func (ir *mockIndexReader) LabelNamesFor(ctx context.Context, postings index.Postings) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) Close() error { return nil } diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index ca7e1f79ee..79dfe8081e 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -52,19 +52,26 @@ type iterator interface { } // NewChunkMergeIterator returns a chunkenc.Iterator that merges Cortex chunks together. -func NewChunkMergeIterator(chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator { +func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator { converted := make([]GenericChunk, len(chunks)) for i, c := range chunks { c := c converted[i] = NewGenericChunk(int64(c.From), int64(c.Through), c.NewIterator) } - return NewGenericChunkMergeIterator(converted) + return NewGenericChunkMergeIterator(it, converted) } // NewGenericChunkMergeIterator returns a chunkenc.Iterator that merges generic chunks together. -func NewGenericChunkMergeIterator(chunks []GenericChunk) chunkenc.Iterator { - iter := newMergeIterator(chunks) +func NewGenericChunkMergeIterator(it chunkenc.Iterator, chunks []GenericChunk) chunkenc.Iterator { + + var underlying iterator + + if ia, ok := it.(*iteratorAdapter); ok { + underlying = ia.underlying + } + + iter := newMergeIterator(underlying, chunks) return newIteratorAdapter(iter) } diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index 30c0a0e38c..4f4b57bfe4 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -55,8 +55,9 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { b.Run(name, func(b *testing.B) { b.ReportAllocs() + var it chunkenc.Iterator for n := 0; n < b.N; n++ { - it := NewChunkMergeIterator(chunks, 0, 0) + it = NewChunkMergeIterator(it, chunks, 0, 0) for it.Next() != chunkenc.ValNone { it.At() } @@ -108,9 +109,9 @@ func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) { b.ResetTimer() b.Run(name, func(b *testing.B) { b.ReportAllocs() - + var it chunkenc.Iterator for n := 0; n < b.N; n++ { - it := NewChunkMergeIterator(chunks, 0, 0) + it = NewChunkMergeIterator(it, chunks, 0, 0) i := int64(0) for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone { i++ @@ -132,7 +133,7 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) { chunkTwo := util.GenerateChunk(t, step, model.Time(10*step/time.Millisecond), 1, enc) chunks := []chunk.Chunk{chunkOne, chunkTwo} - sut := NewChunkMergeIterator(chunks, 0, 0) + sut := NewChunkMergeIterator(nil, chunks, 0, 0) // Following calls mimics Prometheus's query engine behaviour for VectorSelector. require.Equal(t, valType, sut.Next()) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 8c7ebdf062..27030149d2 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -23,18 +23,27 @@ type mergeIterator struct { currErr error } -func newMergeIterator(cs []GenericChunk) *mergeIterator { +func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator { css := partitionChunks(cs) - its := make([]*nonOverlappingIterator, 0, len(css)) - for _, cs := range css { - its = append(its, newNonOverlappingIterator(cs)) + + var c *mergeIterator + + if mIterator, ok := it.(*mergeIterator); ok && cap(mIterator.its) >= len(css) { + c = mIterator.Reset(len(css)) + } else { + c = &mergeIterator{ + h: make(iteratorHeap, 0, len(css)), + batches: make(batchStream, 0, len(css)), + batchesBuf: make(batchStream, len(css)), + } } - c := &mergeIterator{ - its: its, - h: make(iteratorHeap, 0, len(its)), - batches: make(batchStream, 0, len(its)), - batchesBuf: make(batchStream, len(its)), + if cap(c.its) < len(css) { + c.its = make([]*nonOverlappingIterator, 0, len(css)) + } + + for _, cs := range css { + c.its = append(c.its, newNonOverlappingIterator(cs)) } for _, iter := range c.its { @@ -52,6 +61,29 @@ func newMergeIterator(cs []GenericChunk) *mergeIterator { return c } +func (c *mergeIterator) Reset(size int) *mergeIterator { + c.its = c.its[:0] + c.h = c.h[:0] + c.batches = c.batches[:0] + + if size > cap(c.batchesBuf) { + c.batchesBuf = make(batchStream, len(c.its)) + } else { + c.batchesBuf = c.batchesBuf[:size] + for i := 0; i < size; i++ { + c.batchesBuf[i] = promchunk.Batch{} + } + } + + for i := 0; i < len(c.nextBatchBuf); i++ { + c.nextBatchBuf[i] = promchunk.Batch{} + } + + c.currErr = nil + + return c +} + func (c *mergeIterator) Seek(t int64, size int) chunkenc.ValueType { // Optimisation to see if the seek is within our current caches batches. diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index 8ad0d16df4..d835640d70 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -1,10 +1,12 @@ package batch import ( + "fmt" "testing" "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/cortexproject/cortex/pkg/chunk/encoding" ) @@ -18,14 +20,40 @@ func TestMergeIter(t *testing.T) { chunk4 := mkGenericChunk(t, model.TimeFromUnix(75), 100, enc) chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc) - iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + iter := newMergeIterator(nil, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) testIter(t, 200, newIteratorAdapter(iter), enc) - iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + iter = newMergeIterator(iter, []GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5}) testSeek(t, 200, newIteratorAdapter(iter), enc) }) } +func BenchmarkMergeIterator(b *testing.B) { + chunks := make([]GenericChunk, 0, 10) + for i := 0; i < 10; i++ { + chunks = append(chunks, mkGenericChunk(b, model.Time(i*25), 120, encoding.PrometheusXorChunk)) + } + iter := newMergeIterator(nil, chunks) + + for _, r := range []bool{true, false} { + b.Run(fmt.Sprintf("reuse-%t", r), func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if r { + iter = newMergeIterator(iter, chunks) + } else { + iter = newMergeIterator(nil, chunks) + } + a := newIteratorAdapter(iter) + for a.Next() != chunkenc.ValNone { + + } + } + }) + } +} + func TestMergeHarder(t *testing.T) { t.Parallel() forEncodings(t, func(t *testing.T, enc encoding.Encoding) { @@ -40,10 +68,10 @@ func TestMergeHarder(t *testing.T) { chunks = append(chunks, mkGenericChunk(t, from, samples, enc)) from = from.Add(time.Duration(offset) * time.Second) } - iter := newMergeIterator(chunks) + iter := newMergeIterator(nil, chunks) testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) - iter = newMergeIterator(chunks) + iter = newMergeIterator(iter, chunks) testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter), enc) }) } diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index e072228ac7..0b3fabf011 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -7,4 +7,4 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" ) -type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator +type chunkIteratorFunc func(it chunkenc.Iterator, chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 8ae7c49106..709294e6fa 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -155,8 +155,8 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo serieses = append(serieses, &storage.SeriesEntry{ Lset: ls, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT)) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return q.chunkIterFn(it, chunks, model.Time(minT), model.Time(maxT)) }, }) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 784aef3bee..e9a80374cd 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -18,18 +18,14 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/promql-engine/engine" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/lazyquery" - seriesset "github.com/cortexproject/cortex/pkg/querier/series" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -188,7 +184,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor QueryStoreAfter: cfg.QueryStoreAfter, } } - queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits) + queryable := NewQueryable(distributorQueryable, ns, cfg, limits) exemplarQueryable := newDistributorExemplarQueryable(distributor) lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) { @@ -275,13 +271,12 @@ type limiterHolder struct { } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable { +func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ now: time.Now(), mint: mint, maxt: maxt, - chunkIterFn: chunkIterFn, limits: limits, maxQueryIntoFuture: cfg.MaxQueryIntoFuture, ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength, @@ -295,10 +290,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, } type querier struct { - chunkIterFn chunkIteratorFunc - now time.Time - mint, maxt int64 - + now time.Time + mint, maxt int64 limits *validation.Overrides maxQueryIntoFuture time.Duration distributor QueryableWithFilter @@ -670,24 +663,3 @@ func validateQueryTimeRange(ctx context.Context, userID string, startMs, endMs i return int64(startTime), int64(endTime), nil } - -// Series in the returned set are sorted alphabetically by labels. -func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet { - chunksBySeries := map[string][]chunk.Chunk{} - for _, c := range chunks { - key := client.LabelsToKeyString(c.Metric) - chunksBySeries[key] = append(chunksBySeries[key], c) - } - - series := make([]storage.Series, 0, len(chunksBySeries)) - for i := range chunksBySeries { - series = append(series, &storage.SeriesEntry{ - Lset: chunksBySeries[i][0].Metric, - SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator { - return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt)) - }, - }) - } - - return seriesset.NewConcreteSeriesSet(true, series) -} diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 044bf0e193..69b542e2d9 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/assert" @@ -32,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/batch" + "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -333,7 +335,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queryable := range tc.storeQueriables { wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable}) } - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6, @@ -521,7 +523,7 @@ func TestLimits(t *testing.T) { overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) require.NoError(t, err) - queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) opts := promql.EngineOpts{ Logger: log.NewNopLogger(), MaxSamples: 1e6, @@ -1476,7 +1478,7 @@ type mockStoreQuerier struct { // Select implements storage.Querier interface. // The bool passed is ignored because the series is always sorted. -func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +func (q *mockStoreQuerier) Select(_ context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { // If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded. // That flag is only to be set with blocks storage engine, and this is a protective measure. if sp != nil && sp.Func == "series" { @@ -1488,7 +1490,24 @@ func (q *mockStoreQuerier) Select(ctx context.Context, _ bool, sp *storage.Selec return storage.ErrSeriesSet(err) } - return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc) + cs := make([]storage.Series, 0, len(chunks)) + chunksBySeries := map[string][]chunk.Chunk{} + + for _, c := range chunks { + key := client.LabelsToKeyString(c.Metric) + chunksBySeries[key] = append(chunksBySeries[key], c) + } + + for i, c := range chunksBySeries { + cs = append(cs, &storage.SeriesEntry{ + Lset: chunksBySeries[i][0].Metric, + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return q.chunkIteratorFunc(it, c, model.Time(mint), model.Time(maxt)) + }, + }) + } + + return series.NewConcreteSeriesSet(true, cs) } func (q *mockStoreQuerier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, labels ...*labels.Matcher) ([]string, annotations.Annotations, error) { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3658b3fc79..38f31e8da0 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -109,7 +109,7 @@ type LimitsPerLabelSetEntry struct { type LimitsPerLabelSet struct { Limits LimitsPerLabelSetEntry `yaml:"limits" json:"limits" doc:"nocli"` - LabelSet labels.Labels `yaml:"label_set" json:"label_set" doc:"nocli|description=LabelSet which the limit should be applied."` + LabelSet labels.Labels `yaml:"label_set" json:"label_set" doc:"nocli|description=LabelSet which the limit should be applied. If no labels are provided, it becomes the default partition which matches any series that doesn't match any other explicitly defined label sets.'"` Id string `yaml:"-" json:"-" doc:"nocli"` Hash uint64 `yaml:"-" json:"-" doc:"nocli"` } @@ -1043,3 +1043,33 @@ func MaxDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time } return result } + +// LimitsPerLabelSetsForSeries checks matching labelset limits for the given series. +func LimitsPerLabelSetsForSeries(limitsPerLabelSets []LimitsPerLabelSet, metric labels.Labels) []LimitsPerLabelSet { + // returning early to not have any overhead + if len(limitsPerLabelSets) == 0 { + return nil + } + r := make([]LimitsPerLabelSet, 0, len(limitsPerLabelSets)) + defaultPartitionIndex := -1 +outer: + for i, lbls := range limitsPerLabelSets { + // Default partition exists. + if lbls.LabelSet.Len() == 0 { + defaultPartitionIndex = i + continue + } + for _, lbl := range lbls.LabelSet { + // We did not find some of the labels on the set + if v := metric.Get(lbl.Name); v != lbl.Value { + continue outer + } + } + r = append(r, lbls) + } + // Use default partition limiter if it is configured and no other matching partitions. + if defaultPartitionIndex != -1 && len(r) == 0 { + r = append(r, limitsPerLabelSets[defaultPartitionIndex]) + } + return r +} diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 50d7cb7e3f..3ac0230d67 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -791,3 +791,63 @@ func TestEvaluationDelayHigherThanRulerQueryOffset(t *testing.T) { rulerQueryOffset := ov.RulerQueryOffset(tenant) assert.Equal(t, evaluationDelay, rulerQueryOffset) } + +func TestLimitsPerLabelSetsForSeries(t *testing.T) { + for _, tc := range []struct { + name string + limits []LimitsPerLabelSet + metric labels.Labels + expectedLimits []LimitsPerLabelSet + }{ + { + name: "no limits", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + }, + { + name: "no limits matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + }, + expectedLimits: []LimitsPerLabelSet{}, + }, + { + name: "one limit matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + {LabelSet: labels.FromMap(map[string]string{"foo": "bar"})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar"})}, + }, + }, + { + name: "default limit matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + {LabelSet: labels.FromMap(map[string]string{})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{})}, + }, + }, + { + name: "one limit matched so not picking default limit", + metric: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"})}, + {LabelSet: labels.FromMap(map[string]string{})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"})}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + matched := LimitsPerLabelSetsForSeries(tc.limits, tc.metric) + require.Equal(t, tc.expectedLimits, matched) + }) + } +}