diff --git a/CHANGELOG.md b/CHANGELOG.md index 8559e53924..6181aba0cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,11 +47,12 @@ * [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386 * [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358 * [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay. #6277 +* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406 +* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 * [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326 * [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271 * [BUGFIX] Ingester: Fix regression on usage of cortex_ingester_queried_chunks. #6398 -* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406 * [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409 ## 1.18.1 2024-10-14 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d5c06fc205..f48bfeddbc 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5770,7 +5770,9 @@ limits: # would not enforce any limits. [max_series: | default = ] -# LabelSet which the limit should be applied. +# LabelSet which the limit should be applied. If no labels are provided, it +# becomes the default partition which matches any series that doesn't match any +# other explicitly defined label sets.' [label_set: | default = []] ``` diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f64368877f..1c455540db 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -366,22 +366,117 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) - // Should remove metrics when the limits is removed + // Add default partition -> no label set configured working as a fallback when a series + // doesn't match any existing label set limit. + emptyLabels := labels.EmptyLabels() + defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels, + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 2, + }, + } + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits) + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + + lbls = []string{labels.MetricName, "test_default"} + for i := 0; i < 2; i++ { + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API)) + require.NoError(t, err) + } + + // Max series limit for default partition is 2 so 1 more series will be throttled. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API)) + httpResp, ok = httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "returned error is not an httpgrpc response") + assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) + require.ErrorContains(t, err, emptyLabels.String()) + + ing.updateActiveSeries(ctx) + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. + # TYPE cortex_ingester_limits_per_labelset gauge + cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. + # TYPE cortex_ingester_usage_per_labelset gauge + cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7 + cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) + + // Add a new label set limit. + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, + validation.LimitsPerLabelSet{LabelSet: labels.FromMap(map[string]string{ + "series": "0", + }), + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 3, + }, + }, + ) + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + ing.updateActiveSeries(ctx) + // Default partition usage reduced from 2 to 1 as one series in default partition + // now counted into the new partition. + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. + # TYPE cortex_ingester_limits_per_labelset gauge + cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. + # TYPE cortex_ingester_usage_per_labelset gauge + cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7 + cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 1 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 1 + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) + + // Should remove metrics when the limits is removed, keep default partition limit limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2] + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits) b, err = json.Marshal(limits) require.NoError(t, err) require.NoError(t, limits.UnmarshalJSON(b)) tenantLimits.setLimits(userID, &limits) ing.updateActiveSeries(ctx) + // Default partition usage increased from 2 to 10 as some existing partitions got removed. require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. # TYPE cortex_ingester_limits_per_labelset gauge cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. # TYPE cortex_ingester_usage_per_labelset gauge cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) // Should persist between restarts @@ -396,10 +491,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { # TYPE cortex_ingester_limits_per_labelset gauge cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. # TYPE cortex_ingester_usage_per_labelset gauge cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck @@ -420,6 +517,13 @@ func TestPushRace(t *testing.T) { MaxSeries: 10e10, }, }, + { + // Default partition. + LabelSet: labels.EmptyLabels(), + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 10e10, + }, + }, } dir := t.TempDir() @@ -451,6 +555,10 @@ func TestPushRace(t *testing.T) { defer wg.Done() _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API)) require.NoError(t, err) + + // Go to default partition. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API)) + require.NoError(t, err) }() } } @@ -472,13 +580,13 @@ func TestPushRace(t *testing.T) { err = ir.Series(p.At(), &builder, nil) require.NoError(t, err) lbls := builder.Labels() - require.Equal(t, "foo", lbls.Get(labels.MetricName)) + require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar") require.Equal(t, "1", lbls.Get("userId")) require.NotEmpty(t, lbls.Get("k")) builder.Reset() } - require.Equal(t, numberOfSeries, total) - require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries()) + require.Equal(t, 2*numberOfSeries, total) + require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries()) } func TestIngesterUserLimitExceeded(t *testing.T) { diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 9b572a8409..0aaf6b312a 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int // AssertMaxSeriesPerLabelSet limit has not been reached compared to the current // number of metrics with metadata in input and returns an error if so. -func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error { - m := l.limitsPerLabelSets(userID, metric) - for _, limit := range m { +func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error { + limits := l.limits.LimitsPerLabelSet(userID) + matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric) + for _, limit := range matchedLimits { maxSeriesFunc := func(string) int { return limit.Limits.MaxSeries } local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc) - if u, err := f(limit); err != nil { + if u, err := f(limits, limit); err != nil { return err } else if u >= local { return errMaxSeriesPerLabelSetLimitExceeded{ @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet { m := l.limits.LimitsPerLabelSet(userID) - - // returning early to not have any overhead - if len(m) == 0 { - return nil - } - - r := make([]validation.LimitsPerLabelSet, 0, len(m)) -outer: - for _, lbls := range m { - for _, lbl := range lbls.LabelSet { - // We did not find some of the labels on the set - if v := metric.Get(lbl.Name); v != lbl.Value { - continue outer - } - } - r = append(r, lbls) - } - return r + return validation.LimitsPerLabelSetsForSeries(m, metric) } func (l *Limiter) maxSeriesPerMetric(userID string) int { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index e4759191c1..944af00293 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") - actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) { + actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { return testData.series, nil }) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 7b0dec6920..8518bbb5b8 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" "github.com/segmentio/fasthash/fnv1a" @@ -114,21 +115,31 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter { } func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error { - return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) { - s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards] + return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { + s := m.shards[util.HashFP(model.Fingerprint(limit.Hash))%numMetricCounterShards] s.RLock() - if r, ok := s.valuesCounter[set.Hash]; ok { + if r, ok := s.valuesCounter[limit.Hash]; ok { defer s.RUnlock() return r.count, nil } s.RUnlock() // We still dont keep track of this label value so we need to backfill - return m.backFillLimit(ctx, u, set, s) + return m.backFillLimit(ctx, u, false, allLimits, limit, s) }) } -func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) { +func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, forceBackfill bool, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) { + s.Lock() + // If not force backfill, use existing counter value. + if !forceBackfill { + if r, ok := s.valuesCounter[limit.Hash]; ok { + s.Unlock() + return r.count, nil + } + } + + defer s.Unlock() ir, err := u.db.Head().Index() if err != nil { return 0, err @@ -136,37 +147,72 @@ func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit defer ir.Close() - s.Lock() - defer s.Unlock() - if r, ok := s.valuesCounter[limit.Hash]; !ok { - postings := make([]index.Postings, 0, len(limit.LabelSet)) - for _, lbl := range limit.LabelSet { - p, err := ir.Postings(ctx, lbl.Name, lbl.Value) - if err != nil { - return 0, err - } - postings = append(postings, p) - } + numSeries := u.db.Head().NumSeries() + totalCount, err := getCardinalityForLimitsPerLabelSet(ctx, numSeries, ir, allLimits, limit) + if err != nil { + return 0, err + } - p := index.Intersect(postings...) + s.valuesCounter[limit.Hash] = &labelSetCounterEntry{ + count: totalCount, + labels: limit.LabelSet, + } + return totalCount, nil +} - totalCount := 0 - for p.Next() { - totalCount++ +func getCardinalityForLimitsPerLabelSet(ctx context.Context, numSeries uint64, ir tsdb.IndexReader, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { + // Easy path with explicit labels. + if limit.LabelSet.Len() > 0 { + p, err := getPostingForLabels(ctx, ir, limit.LabelSet) + if err != nil { + return 0, err } + return getPostingCardinality(p) + } - if p.Err() != nil { - return 0, p.Err() + // Default partition needs to get cardinality of all series that doesn't belong to any existing partitions. + postings := make([]index.Postings, 0, len(allLimits)-1) + for _, l := range allLimits { + if l.Hash == limit.Hash { + continue } + p, err := getPostingForLabels(ctx, ir, l.LabelSet) + if err != nil { + return 0, err + } + postings = append(postings, p) + } + mergedCardinality, err := getPostingCardinality(index.Merge(ctx, postings...)) + if err != nil { + return 0, err + } + + return int(numSeries) - mergedCardinality, nil +} - s.valuesCounter[limit.Hash] = &labelSetCounterEntry{ - count: totalCount, - labels: limit.LabelSet, +func getPostingForLabels(ctx context.Context, ir tsdb.IndexReader, lbls labels.Labels) (index.Postings, error) { + postings := make([]index.Postings, 0, len(lbls)) + for _, lbl := range lbls { + p, err := ir.Postings(ctx, lbl.Name, lbl.Value) + if err != nil { + return nil, err } - return totalCount, nil - } else { - return r.count, nil + postings = append(postings, p) } + + return index.Intersect(postings...), nil +} + +func getPostingCardinality(p index.Postings) (int, error) { + totalCount := 0 + for p.Next() { + totalCount++ + } + + if p.Err() != nil { + return 0, p.Err() + } + return totalCount, nil } func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) { @@ -200,10 +246,12 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error { currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{} - for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) { + limits := m.limiter.limits.LimitsPerLabelSet(u.userID) + for _, l := range limits { currentLbsLimitHash[l.Hash] = l } + nonDefaultPartitionChanged := false for i := 0; i < numMetricCounterShards; i++ { s := m.shards[i] s.RLock() @@ -213,19 +261,45 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics if _, ok := currentLbsLimitHash[h]; !ok { metrics.usagePerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls) metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls) + if entry.labels.Len() > 0 { + nonDefaultPartitionChanged = true + } continue } - metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count)) - metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries)) - delete(currentLbsLimitHash, h) + // Delay exposing default partition metrics from current label limits as if + // another label set is added or removed then we need to backfill default partition again. + if entry.labels.Len() > 0 { + metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count)) + metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries)) + delete(currentLbsLimitHash, h) + } } s.RUnlock() } + // Check if we need to backfill default partition. We don't need to backfill when any condition meet: + // 1. Default partition doesn't exist. + // 2. No new partition added and no old partition removed. + if !nonDefaultPartitionChanged { + for _, l := range currentLbsLimitHash { + if l.LabelSet.Len() > 0 { + nonDefaultPartitionChanged = true + break + } + } + } + // Backfill all limits that are not being tracked yet for _, l := range currentLbsLimitHash { s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards] - count, err := m.backFillLimit(ctx, u, l, s) + force := false + // Force backfill to make sure we update the counter for the default partition + // when other limits got added or removed. If no partition is changed then we + // can use the value in the counter. + if l.LabelSet.Len() == 0 && nonDefaultPartitionChanged { + force = true + } + count, err := m.backFillLimit(ctx, u, force, limits, l, s) if err != nil { return err } diff --git a/pkg/ingester/user_state_test.go b/pkg/ingester/user_state_test.go index 071aa5733f..910f8f1a7e 100644 --- a/pkg/ingester/user_state_test.go +++ b/pkg/ingester/user_state_test.go @@ -1,8 +1,15 @@ package ingester import ( + "context" + "errors" "testing" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -92,3 +99,271 @@ func TestMetricCounter(t *testing.T) { }) } } + +func TestGetCardinalityForLimitsPerLabelSet(t *testing.T) { + ctx := context.Background() + testErr := errors.New("err") + for _, tc := range []struct { + name string + ir tsdb.IndexReader + limits []validation.LimitsPerLabelSet + idx int + cnt int + numSeries uint64 + err error + }{ + { + name: "single label", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar")}, + }, + cnt: 5, + }, + { + name: "multiple limits", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("job", "test")}, + {LabelSet: labels.FromStrings("foo", "bar")}, + }, + idx: 1, + cnt: 5, + }, + { + name: "2 labels intersect", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar", "job", "prometheus")}, + }, + cnt: 3, + }, + { + name: "error", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.ErrPostings(testErr)}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar", "job", "prometheus")}, + }, + err: testErr, + }, + { + name: "no match and no default partition", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "baz")}, + }, + idx: 0, + cnt: 0, + }, + { + name: "no match", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "baz")}, + }, + cnt: 0, + }, + { + // Total cardinality 12. Cardinality of existing label set limits 8. + // Default partition cardinality 4. + name: "default partition", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{3, 4, 5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {Hash: 1, LabelSet: labels.FromStrings("foo", "bar")}, + {Hash: 2, LabelSet: labels.FromStrings("job", "test")}, + {}, // Default partition. + }, + idx: 2, + cnt: 4, + numSeries: 12, + }, + { + name: "default partition with error getting postings", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.ErrPostings(testErr)}, + }}, + limits: []validation.LimitsPerLabelSet{ + {Hash: 1, LabelSet: labels.FromStrings("foo", "bar")}, + {Hash: 2, LabelSet: labels.FromStrings("job", "test")}, + {}, // Default partition. + }, + idx: 2, + err: testErr, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cnt, err := getCardinalityForLimitsPerLabelSet(ctx, tc.numSeries, tc.ir, tc.limits, tc.limits[tc.idx]) + if err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tc.cnt, cnt) + } + }) + } +} + +func TestGetPostingForLabels(t *testing.T) { + ctx := context.Background() + testErr := errors.New("err") + for _, tc := range []struct { + name string + ir tsdb.IndexReader + lbls labels.Labels + postings []storage.SeriesRef + err error + }{ + { + name: "single label", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar"), + postings: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "intersect", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar", "job", "prometheus"), + postings: []storage.SeriesRef{1, 3, 5}, + }, + { + name: "error", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.ErrPostings(testErr)}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar", "job", "prometheus"), + postings: []storage.SeriesRef{1, 3, 5}, + err: testErr, + }, + } { + t.Run(tc.name, func(t *testing.T) { + p, err := getPostingForLabels(ctx, tc.ir, tc.lbls) + if err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + series := make([]storage.SeriesRef, 0) + for p.Next() { + series = append(series, p.At()) + } + if tc.err != nil { + require.EqualError(t, p.Err(), tc.err.Error()) + } else { + require.Equal(t, tc.postings, series) + } + } + }) + } +} + +func TestGetPostingCardinality(t *testing.T) { + testErr := errors.New("err") + for _, tc := range []struct { + name string + p index.Postings + cardinality int + err error + }{ + { + name: "empty", + p: index.EmptyPostings(), + }, + { + name: "err", + p: index.ErrPostings(testErr), + err: testErr, + }, + { + name: "cardinality", + p: index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + cardinality: 10, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cnt, err := getPostingCardinality(tc.p) + if tc.err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tc.cardinality, cnt) + } + }) + } +} + +type mockIndexReader struct { + postings map[string]map[string]index.Postings +} + +func (ir *mockIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { + if entries, ok := ir.postings[name]; ok { + if postings, ok2 := entries[values[0]]; ok2 { + return postings, nil + } + } + return index.EmptyPostings(), nil +} + +func (ir *mockIndexReader) Symbols() index.StringIter { return nil } + +func (ir *mockIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings { + return nil +} + +func (ir *mockIndexReader) SortedPostings(index.Postings) index.Postings { return nil } + +func (ir *mockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + return nil +} + +func (ir *mockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { + return nil +} + +func (ir *mockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error) { + return "", nil +} + +func (ir *mockIndexReader) LabelNamesFor(ctx context.Context, postings index.Postings) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) Close() error { return nil } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3658b3fc79..38f31e8da0 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -109,7 +109,7 @@ type LimitsPerLabelSetEntry struct { type LimitsPerLabelSet struct { Limits LimitsPerLabelSetEntry `yaml:"limits" json:"limits" doc:"nocli"` - LabelSet labels.Labels `yaml:"label_set" json:"label_set" doc:"nocli|description=LabelSet which the limit should be applied."` + LabelSet labels.Labels `yaml:"label_set" json:"label_set" doc:"nocli|description=LabelSet which the limit should be applied. If no labels are provided, it becomes the default partition which matches any series that doesn't match any other explicitly defined label sets.'"` Id string `yaml:"-" json:"-" doc:"nocli"` Hash uint64 `yaml:"-" json:"-" doc:"nocli"` } @@ -1043,3 +1043,33 @@ func MaxDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time } return result } + +// LimitsPerLabelSetsForSeries checks matching labelset limits for the given series. +func LimitsPerLabelSetsForSeries(limitsPerLabelSets []LimitsPerLabelSet, metric labels.Labels) []LimitsPerLabelSet { + // returning early to not have any overhead + if len(limitsPerLabelSets) == 0 { + return nil + } + r := make([]LimitsPerLabelSet, 0, len(limitsPerLabelSets)) + defaultPartitionIndex := -1 +outer: + for i, lbls := range limitsPerLabelSets { + // Default partition exists. + if lbls.LabelSet.Len() == 0 { + defaultPartitionIndex = i + continue + } + for _, lbl := range lbls.LabelSet { + // We did not find some of the labels on the set + if v := metric.Get(lbl.Name); v != lbl.Value { + continue outer + } + } + r = append(r, lbls) + } + // Use default partition limiter if it is configured and no other matching partitions. + if defaultPartitionIndex != -1 && len(r) == 0 { + r = append(r, limitsPerLabelSets[defaultPartitionIndex]) + } + return r +} diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 50d7cb7e3f..3ac0230d67 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -791,3 +791,63 @@ func TestEvaluationDelayHigherThanRulerQueryOffset(t *testing.T) { rulerQueryOffset := ov.RulerQueryOffset(tenant) assert.Equal(t, evaluationDelay, rulerQueryOffset) } + +func TestLimitsPerLabelSetsForSeries(t *testing.T) { + for _, tc := range []struct { + name string + limits []LimitsPerLabelSet + metric labels.Labels + expectedLimits []LimitsPerLabelSet + }{ + { + name: "no limits", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + }, + { + name: "no limits matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + }, + expectedLimits: []LimitsPerLabelSet{}, + }, + { + name: "one limit matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + {LabelSet: labels.FromMap(map[string]string{"foo": "bar"})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar"})}, + }, + }, + { + name: "default limit matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + {LabelSet: labels.FromMap(map[string]string{})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{})}, + }, + }, + { + name: "one limit matched so not picking default limit", + metric: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"})}, + {LabelSet: labels.FromMap(map[string]string{})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"})}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + matched := LimitsPerLabelSetsForSeries(tc.limits, tc.metric) + require.Equal(t, tc.expectedLimits, matched) + }) + } +}