Skip to content

Commit

Permalink
implement default partition for limits per labelset but without any l…
Browse files Browse the repository at this point in the history
…abels

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 17, 2024
1 parent b72a536 commit f73e148
Show file tree
Hide file tree
Showing 7 changed files with 549 additions and 60 deletions.
76 changes: 72 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,77 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed
// Add default partition -> no label set configured working as a fallback when a series
// doesn't match any existing label set limit.
emptyLabels := labels.EmptyLabels()
defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels,
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 2,
},
}
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)

lbls = []string{labels.MetricName, "test_default"}
for i := 0; i < 2; i++ {
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API))
require.NoError(t, err)
}

// Max series limit for default partition is 2 so 1 more series will be throttled.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API))
httpResp, ok = httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, emptyLabels.String())

ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed, keep default partition limit
limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2]
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage increased from 2 to 10 as some existing partitions got removed.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should persist between restarts
Expand All @@ -396,10 +451,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

Expand All @@ -420,6 +477,13 @@ func TestPushRace(t *testing.T) {
MaxSeries: 10e10,
},
},
{
// Default partition.
LabelSet: labels.EmptyLabels(),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 10e10,
},
},
}

dir := t.TempDir()
Expand Down Expand Up @@ -451,6 +515,10 @@ func TestPushRace(t *testing.T) {
defer wg.Done()
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)

// Go to default partition.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
}
Expand All @@ -472,13 +540,13 @@ func TestPushRace(t *testing.T) {
err = ir.Series(p.At(), &builder, nil)
require.NoError(t, err)
lbls := builder.Labels()
require.Equal(t, "foo", lbls.Get(labels.MetricName))
require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar")
require.Equal(t, "1", lbls.Get("userId"))
require.NotEmpty(t, lbls.Get("k"))
builder.Reset()
}
require.Equal(t, numberOfSeries, total)
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
require.Equal(t, 2*numberOfSeries, total)
require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries())
}

func TestIngesterUserLimitExceeded(t *testing.T) {
Expand Down
28 changes: 6 additions & 22 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
m := l.limitsPerLabelSets(userID, metric)
for _, limit := range m {
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error {
limits := l.limits.LimitsPerLabelSet(userID)
matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric)
for _, limit := range matchedLimits {
maxSeriesFunc := func(string) int {
return limit.Limits.MaxSeries
}
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
if u, err := f(limit); err != nil {
if u, err := f(limits, limit); err != nil {
return err
} else if u >= local {
return errMaxSeriesPerLabelSetLimitExceeded{
Expand Down Expand Up @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim

func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
m := l.limits.LimitsPerLabelSet(userID)

// returning early to not have any overhead
if len(m) == 0 {
return nil
}

r := make([]validation.LimitsPerLabelSet, 0, len(m))
outer:
for _, lbls := range m {
for _, lbl := range lbls.LabelSet {
// We did not find some of the labels on the set
if v := metric.Get(lbl.Name); v != lbl.Value {
continue outer
}
}
r = append(r, lbls)
}
return r
return validation.LimitsPerLabelSetsForSeries(m, metric)
}

func (l *Limiter) maxSeriesPerMetric(userID string) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
return testData.series, nil
})

Expand Down
138 changes: 105 additions & 33 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/segmentio/fasthash/fnv1a"

Expand Down Expand Up @@ -114,59 +115,113 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter {
}

func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error {
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) {
s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards]
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
s := m.shards[util.HashFP(model.Fingerprint(limit.Hash))%numMetricCounterShards]
s.RLock()
if r, ok := s.valuesCounter[set.Hash]; ok {
if r, ok := s.valuesCounter[limit.Hash]; ok {
defer s.RUnlock()
return r.count, nil
}
s.RUnlock()

// We still dont keep track of this label value so we need to backfill
return m.backFillLimit(ctx, u, set, s)
return m.backFillLimit(ctx, u, false, allLimits, limit, s)
})
}

func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, forceBackfill bool, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
s.Lock()
// If not force backfill, use existing counter value.
if !forceBackfill {
if r, ok := s.valuesCounter[limit.Hash]; ok {
s.Unlock()
return r.count, nil
}
}

ir, err := u.db.Head().Index()
if err != nil {
return 0, err
}

defer ir.Close()

s.Lock()
defer s.Unlock()
if r, ok := s.valuesCounter[limit.Hash]; !ok {
postings := make([]index.Postings, 0, len(limit.LabelSet))
for _, lbl := range limit.LabelSet {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return 0, err
}
postings = append(postings, p)
}
totalCount, err := getCardinalityForLimitsPerLabelSet(ctx, ir, allLimits, limit)
if err != nil {
return 0, err
}

p := index.Intersect(postings...)
s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: limit.LabelSet,
}
s.Unlock()
return totalCount, nil
}

totalCount := 0
for p.Next() {
totalCount++
func getCardinalityForLimitsPerLabelSet(ctx context.Context, ir tsdb.IndexReader, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
// Easy path with explicit labels.
if limit.LabelSet.Len() > 0 {
p, err := getPostingForLabels(ctx, ir, limit.LabelSet)
if err != nil {
return 0, err
}
return getPostingCardinality(p)
}

if p.Err() != nil {
return 0, p.Err()
// Default partition needs to get cardinality of all series that doesn't belong to any existing partitions.
postings := make([]index.Postings, 0, len(allLimits)-1)
for _, l := range allLimits {
if l.Hash == limit.Hash {
continue
}
p, err := getPostingForLabels(ctx, ir, l.LabelSet)
if err != nil {
return 0, err
}
postings = append(postings, p)
}
mergedCardinality, err := getPostingCardinality(index.Merge(ctx, postings...))
if err != nil {
return 0, err
}

name, value := index.AllPostingsKey()
// Don't expand all postings but get length directly instead.
allPostings, err := ir.Postings(ctx, name, value)
if err != nil {
return 0, err
}
allCardinality, err := getPostingCardinality(allPostings)
if err != nil {
return 0, err
}
return allCardinality - mergedCardinality, nil
}

s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: limit.LabelSet,
func getPostingForLabels(ctx context.Context, ir tsdb.IndexReader, lbls labels.Labels) (index.Postings, error) {
postings := make([]index.Postings, 0, len(lbls))
for _, lbl := range lbls {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return nil, err
}
return totalCount, nil
} else {
return r.count, nil
postings = append(postings, p)
}

return index.Intersect(postings...), nil
}

func getPostingCardinality(p index.Postings) (int, error) {
totalCount := 0
for p.Next() {
totalCount++
}

if p.Err() != nil {
return 0, p.Err()
}
return totalCount, nil
}

func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
Expand Down Expand Up @@ -200,10 +255,13 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe

func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error {
currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{}
for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) {
limits := m.limiter.limits.LimitsPerLabelSet(u.userID)
for _, l := range limits {
currentLbsLimitHash[l.Hash] = l
}

nonDefaultPartitionRemoved := false
var defaultPartitionHash uint64
for i := 0; i < numMetricCounterShards; i++ {
s := m.shards[i]
s.RLock()
Expand All @@ -215,17 +273,31 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics
metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls)
continue
}
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
delete(currentLbsLimitHash, h)
// Delay deletion of default partition from current label limits as if
// another label set is removed then we need to backfill default partition again.
if entry.labels.Len() > 0 {
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
delete(currentLbsLimitHash, h)
nonDefaultPartitionRemoved = true
} else {
defaultPartitionHash = h
}
}
s.RUnlock()
}

// No partitions with label sets configured got removed, no need to backfill default partition.
if !nonDefaultPartitionRemoved {
delete(currentLbsLimitHash, defaultPartitionHash)
}

// Backfill all limits that are not being tracked yet
for _, l := range currentLbsLimitHash {
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
count, err := m.backFillLimit(ctx, u, l, s)
// Force backfill is enabled to make sure we update the counter for the default partition
// when other limits got removed.
count, err := m.backFillLimit(ctx, u, true, limits, l, s)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f73e148

Please sign in to comment.