diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f64368877f..a707c73711 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -366,22 +366,77 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) - // Should remove metrics when the limits is removed + // Add default partition -> no label set configured working as a fallback when a series + // doesn't match any existing label set limit. + emptyLabels := labels.EmptyLabels() + defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels, + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 2, + }, + } + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits) + b, err = json.Marshal(limits) + require.NoError(t, err) + require.NoError(t, limits.UnmarshalJSON(b)) + tenantLimits.setLimits(userID, &limits) + + lbls = []string{labels.MetricName, "test_default"} + for i := 0; i < 2; i++ { + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API)) + require.NoError(t, err) + } + + // Max series limit for default partition is 2 so 1 more series will be throttled. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API)) + httpResp, ok = httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "returned error is not an httpgrpc response") + assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) + require.ErrorContains(t, err, emptyLabels.String()) + + ing.updateActiveSeries(ctx) + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. + # TYPE cortex_ingester_limits_per_labelset gauge + cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10 + cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. + # TYPE cortex_ingester_usage_per_labelset gauge + cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7 + cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2 + `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) + + // Should remove metrics when the limits is removed, keep default partition limit limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2] + limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits) b, err = json.Marshal(limits) require.NoError(t, err) require.NoError(t, limits.UnmarshalJSON(b)) tenantLimits.setLimits(userID, &limits) ing.updateActiveSeries(ctx) + // Default partition usage increased from 2 to 10 as some existing partitions got removed. require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP cortex_ingester_limits_per_labelset Limits per user and labelset. # TYPE cortex_ingester_limits_per_labelset gauge cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. # TYPE cortex_ingester_usage_per_labelset gauge cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) // Should persist between restarts @@ -396,10 +451,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { # TYPE cortex_ingester_limits_per_labelset gauge cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2 # HELP cortex_ingester_usage_per_labelset Current usage per user and labelset. # TYPE cortex_ingester_usage_per_labelset gauge cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3 cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2 + cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10 `), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset")) services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck @@ -420,6 +477,13 @@ func TestPushRace(t *testing.T) { MaxSeries: 10e10, }, }, + { + // Default partition. + LabelSet: labels.EmptyLabels(), + Limits: validation.LimitsPerLabelSetEntry{ + MaxSeries: 10e10, + }, + }, } dir := t.TempDir() @@ -451,6 +515,10 @@ func TestPushRace(t *testing.T) { defer wg.Done() _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API)) require.NoError(t, err) + + // Go to default partition. + _, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API)) + require.NoError(t, err) }() } } @@ -472,13 +540,13 @@ func TestPushRace(t *testing.T) { err = ir.Series(p.At(), &builder, nil) require.NoError(t, err) lbls := builder.Labels() - require.Equal(t, "foo", lbls.Get(labels.MetricName)) + require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar") require.Equal(t, "1", lbls.Get("userId")) require.NotEmpty(t, lbls.Get("k")) builder.Reset() } - require.Equal(t, numberOfSeries, total) - require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries()) + require.Equal(t, 2*numberOfSeries, total) + require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries()) } func TestIngesterUserLimitExceeded(t *testing.T) { diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 9b572a8409..0aaf6b312a 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int // AssertMaxSeriesPerLabelSet limit has not been reached compared to the current // number of metrics with metadata in input and returns an error if so. -func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error { - m := l.limitsPerLabelSets(userID, metric) - for _, limit := range m { +func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error { + limits := l.limits.LimitsPerLabelSet(userID) + matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric) + for _, limit := range matchedLimits { maxSeriesFunc := func(string) int { return limit.Limits.MaxSeries } local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc) - if u, err := f(limit); err != nil { + if u, err := f(limits, limit); err != nil { return err } else if u >= local { return errMaxSeriesPerLabelSetLimitExceeded{ @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet { m := l.limits.LimitsPerLabelSet(userID) - - // returning early to not have any overhead - if len(m) == 0 { - return nil - } - - r := make([]validation.LimitsPerLabelSet, 0, len(m)) -outer: - for _, lbls := range m { - for _, lbl := range lbls.LabelSet { - // We did not find some of the labels on the set - if v := metric.Get(lbl.Name); v != lbl.Value { - continue outer - } - } - r = append(r, lbls) - } - return r + return validation.LimitsPerLabelSetsForSeries(m, metric) } func (l *Limiter) maxSeriesPerMetric(userID string) int { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index e4759191c1..944af00293 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") - actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) { + actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { return testData.series, nil }) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 7b0dec6920..da6046e1a3 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" "github.com/segmentio/fasthash/fnv1a" @@ -114,21 +115,30 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter { } func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error { - return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) { - s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards] + return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { + s := m.shards[util.HashFP(model.Fingerprint(limit.Hash))%numMetricCounterShards] s.RLock() - if r, ok := s.valuesCounter[set.Hash]; ok { + if r, ok := s.valuesCounter[limit.Hash]; ok { defer s.RUnlock() return r.count, nil } s.RUnlock() // We still dont keep track of this label value so we need to backfill - return m.backFillLimit(ctx, u, set, s) + return m.backFillLimit(ctx, u, false, allLimits, limit, s) }) } -func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) { +func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, forceBackfill bool, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) { + s.Lock() + // If not force backfill, use existing counter value. + if !forceBackfill { + if r, ok := s.valuesCounter[limit.Hash]; ok { + s.Unlock() + return r.count, nil + } + } + ir, err := u.db.Head().Index() if err != nil { return 0, err @@ -136,37 +146,79 @@ func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit defer ir.Close() - s.Lock() - defer s.Unlock() - if r, ok := s.valuesCounter[limit.Hash]; !ok { - postings := make([]index.Postings, 0, len(limit.LabelSet)) - for _, lbl := range limit.LabelSet { - p, err := ir.Postings(ctx, lbl.Name, lbl.Value) - if err != nil { - return 0, err - } - postings = append(postings, p) - } + totalCount, err := getCardinalityForLimitsPerLabelSet(ctx, ir, allLimits, limit) + if err != nil { + return 0, err + } - p := index.Intersect(postings...) + s.valuesCounter[limit.Hash] = &labelSetCounterEntry{ + count: totalCount, + labels: limit.LabelSet, + } + s.Unlock() + return totalCount, nil +} - totalCount := 0 - for p.Next() { - totalCount++ +func getCardinalityForLimitsPerLabelSet(ctx context.Context, ir tsdb.IndexReader, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { + // Easy path with explicit labels. + if limit.LabelSet.Len() > 0 { + p, err := getPostingForLabels(ctx, ir, limit.LabelSet) + if err != nil { + return 0, err } + return getPostingCardinality(p) + } - if p.Err() != nil { - return 0, p.Err() + // Default partition needs to get cardinality of all series that doesn't belong to any existing partitions. + postings := make([]index.Postings, 0, len(allLimits)-1) + for _, l := range allLimits { + if l.Hash == limit.Hash { + continue } + p, err := getPostingForLabels(ctx, ir, l.LabelSet) + if err != nil { + return 0, err + } + postings = append(postings, p) + } + mergedCardinality, err := getPostingCardinality(index.Merge(ctx, postings...)) - s.valuesCounter[limit.Hash] = &labelSetCounterEntry{ - count: totalCount, - labels: limit.LabelSet, + name, value := index.AllPostingsKey() + // Don't expand all postings but get length directly instead. + allPostings, err := ir.Postings(ctx, name, value) + if err != nil { + return 0, err + } + allCardinality, err := getPostingCardinality(allPostings) + if err != nil { + return 0, err + } + return allCardinality - mergedCardinality, nil +} + +func getPostingForLabels(ctx context.Context, ir tsdb.IndexReader, lbls labels.Labels) (index.Postings, error) { + postings := make([]index.Postings, 0, len(lbls)) + for _, lbl := range lbls { + p, err := ir.Postings(ctx, lbl.Name, lbl.Value) + if err != nil { + return nil, err } - return totalCount, nil - } else { - return r.count, nil + postings = append(postings, p) + } + + return index.Intersect(postings...), nil +} + +func getPostingCardinality(p index.Postings) (int, error) { + totalCount := 0 + for p.Next() { + totalCount++ + } + + if p.Err() != nil { + return 0, p.Err() } + return totalCount, nil } func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) { @@ -200,10 +252,13 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error { currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{} - for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) { + limits := m.limiter.limits.LimitsPerLabelSet(u.userID) + for _, l := range limits { currentLbsLimitHash[l.Hash] = l } + nonDefaultPartitionRemoved := false + var defaultPartitionHash uint64 for i := 0; i < numMetricCounterShards; i++ { s := m.shards[i] s.RLock() @@ -215,17 +270,31 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls) continue } - metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count)) - metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries)) - delete(currentLbsLimitHash, h) + // Delay deletion of default partition from current label limits as if + // another label set is removed then we need to backfill default partition again. + if entry.labels.Len() > 0 { + metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count)) + metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries)) + delete(currentLbsLimitHash, h) + nonDefaultPartitionRemoved = true + } else { + defaultPartitionHash = h + } } s.RUnlock() } + // No partitions with label sets configured got removed, no need to backfill default partition. + if !nonDefaultPartitionRemoved { + delete(currentLbsLimitHash, defaultPartitionHash) + } + // Backfill all limits that are not being tracked yet for _, l := range currentLbsLimitHash { s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards] - count, err := m.backFillLimit(ctx, u, l, s) + // Force backfill is enabled to make sure we update the counter for the default partition + // when other limits got removed. + count, err := m.backFillLimit(ctx, u, true, limits, l, s) if err != nil { return err } diff --git a/pkg/ingester/user_state_test.go b/pkg/ingester/user_state_test.go index 071aa5733f..3f7aeba29c 100644 --- a/pkg/ingester/user_state_test.go +++ b/pkg/ingester/user_state_test.go @@ -1,6 +1,13 @@ package ingester import ( + "context" + "errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" "testing" "github.com/stretchr/testify/assert" @@ -92,3 +99,271 @@ func TestMetricCounter(t *testing.T) { }) } } + +func TestGetCardinalityForLimitsPerLabelSet(t *testing.T) { + ctx := context.Background() + testErr := errors.New("err") + for _, tc := range []struct { + name string + ir tsdb.IndexReader + limits []validation.LimitsPerLabelSet + idx int + cnt int + err error + }{ + { + name: "single label", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar")}, + }, + cnt: 5, + }, + { + name: "multiple limits", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("job", "test")}, + {LabelSet: labels.FromStrings("foo", "bar")}, + }, + idx: 1, + cnt: 5, + }, + { + name: "2 labels intersect", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar", "job", "prometheus")}, + }, + cnt: 3, + }, + { + name: "error", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.ErrPostings(testErr)}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "bar", "job", "prometheus")}, + }, + err: testErr, + }, + { + name: "no match and no default partition", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "baz")}, + }, + idx: 0, + cnt: 0, + }, + { + name: "no match", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {LabelSet: labels.FromStrings("foo", "baz")}, + }, + cnt: 0, + }, + { + // Total cardinality 12. Cardinality of existing label set limits 8. + // Default partition cardinality 4. + name: "default partition", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{3, 4, 5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "": {"": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})}, + }}, + limits: []validation.LimitsPerLabelSet{ + {Hash: 1, LabelSet: labels.FromStrings("foo", "bar")}, + {Hash: 2, LabelSet: labels.FromStrings("job", "test")}, + {}, // Default partition. + }, + idx: 2, + cnt: 4, + }, + { + name: "default partition with error getting postings", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "job": {"test": index.NewListPostings([]storage.SeriesRef{5, 6, 7, 8})}, + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "": {"": index.ErrPostings(testErr)}, + }}, + limits: []validation.LimitsPerLabelSet{ + {Hash: 1, LabelSet: labels.FromStrings("foo", "bar")}, + {Hash: 2, LabelSet: labels.FromStrings("job", "test")}, + {}, // Default partition. + }, + idx: 2, + err: testErr, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cnt, err := getCardinalityForLimitsPerLabelSet(ctx, tc.ir, tc.limits, tc.limits[tc.idx]) + if err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tc.cnt, cnt) + } + }) + } +} + +func TestGetPostingForLabels(t *testing.T) { + ctx := context.Background() + testErr := errors.New("err") + for _, tc := range []struct { + name string + ir tsdb.IndexReader + lbls labels.Labels + postings []storage.SeriesRef + err error + }{ + { + name: "single label", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar"), + postings: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "intersect", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar", "job", "prometheus"), + postings: []storage.SeriesRef{1, 3, 5}, + }, + { + name: "error", + ir: &mockIndexReader{postings: map[string]map[string]index.Postings{ + "foo": {"bar": index.ErrPostings(testErr)}, + "job": {"prometheus": index.NewListPostings([]storage.SeriesRef{1, 3, 5})}, + }}, + lbls: labels.FromStrings("foo", "bar", "job", "prometheus"), + postings: []storage.SeriesRef{1, 3, 5}, + err: testErr, + }, + } { + t.Run(tc.name, func(t *testing.T) { + p, err := getPostingForLabels(ctx, tc.ir, tc.lbls) + if err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + series := make([]storage.SeriesRef, 0) + for p.Next() { + series = append(series, p.At()) + } + if tc.err != nil { + require.EqualError(t, p.Err(), tc.err.Error()) + } else { + require.Equal(t, tc.postings, series) + } + } + }) + } +} + +func TestGetPostingCardinality(t *testing.T) { + testErr := errors.New("err") + for _, tc := range []struct { + name string + p index.Postings + cardinality int + err error + }{ + { + name: "empty", + p: index.EmptyPostings(), + }, + { + name: "err", + p: index.ErrPostings(testErr), + err: testErr, + }, + { + name: "cardinality", + p: index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + cardinality: 10, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cnt, err := getPostingCardinality(tc.p) + if tc.err != nil { + require.EqualError(t, err, tc.err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tc.cardinality, cnt) + } + }) + } +} + +type mockIndexReader struct { + postings map[string]map[string]index.Postings +} + +func (ir *mockIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { + if entries, ok := ir.postings[name]; ok { + if postings, ok2 := entries[values[0]]; ok2 { + return postings, nil + } + } + return index.EmptyPostings(), nil +} + +func (ir *mockIndexReader) Symbols() index.StringIter { return nil } + +func (ir *mockIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings { + return nil +} + +func (ir *mockIndexReader) SortedPostings(index.Postings) index.Postings { return nil } + +func (ir *mockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { + return nil +} + +func (ir *mockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { + return nil +} + +func (ir *mockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error) { + return "", nil +} + +func (ir *mockIndexReader) LabelNamesFor(ctx context.Context, postings index.Postings) ([]string, error) { + return nil, nil +} + +func (ir *mockIndexReader) Close() error { return nil } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3658b3fc79..33137cd490 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -1043,3 +1043,33 @@ func MaxDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time } return result } + +// LimitsPerLabelSetsForSeries checks matching labelset limits for the given series. +func LimitsPerLabelSetsForSeries(limitsPerLabelSets []LimitsPerLabelSet, metric labels.Labels) []LimitsPerLabelSet { + // returning early to not have any overhead + if len(limitsPerLabelSets) == 0 { + return nil + } + r := make([]LimitsPerLabelSet, 0, len(limitsPerLabelSets)) + defaultPartitionIndex := -1 +outer: + for i, lbls := range limitsPerLabelSets { + // Default partition exists. + if lbls.LabelSet.Len() == 0 { + defaultPartitionIndex = i + continue + } + for _, lbl := range lbls.LabelSet { + // We did not find some of the labels on the set + if v := metric.Get(lbl.Name); v != lbl.Value { + continue outer + } + } + r = append(r, lbls) + } + // Use default partition limiter if it is configured and no other matching partitions. + if defaultPartitionIndex != -1 && len(r) == 0 { + r = append(r, limitsPerLabelSets[defaultPartitionIndex]) + } + return r +} diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 50d7cb7e3f..3ac0230d67 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -791,3 +791,63 @@ func TestEvaluationDelayHigherThanRulerQueryOffset(t *testing.T) { rulerQueryOffset := ov.RulerQueryOffset(tenant) assert.Equal(t, evaluationDelay, rulerQueryOffset) } + +func TestLimitsPerLabelSetsForSeries(t *testing.T) { + for _, tc := range []struct { + name string + limits []LimitsPerLabelSet + metric labels.Labels + expectedLimits []LimitsPerLabelSet + }{ + { + name: "no limits", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + }, + { + name: "no limits matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + }, + expectedLimits: []LimitsPerLabelSet{}, + }, + { + name: "one limit matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + {LabelSet: labels.FromMap(map[string]string{"foo": "bar"})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar"})}, + }, + }, + { + name: "default limit matched", + metric: labels.FromMap(map[string]string{"foo": "bar"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "baz"})}, + {LabelSet: labels.FromMap(map[string]string{})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{})}, + }, + }, + { + name: "one limit matched so not picking default limit", + metric: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"}), + limits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"})}, + {LabelSet: labels.FromMap(map[string]string{})}, + }, + expectedLimits: []LimitsPerLabelSet{ + {LabelSet: labels.FromMap(map[string]string{"foo": "bar", "cluster": "us-west-2"})}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + matched := LimitsPerLabelSetsForSeries(tc.limits, tc.metric) + require.Equal(t, tc.expectedLimits, matched) + }) + } +}