Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default partition when no matching labelset liimits #6435

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay. #6277
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271
* [BUGFIX] Ingester: Fix regression on usage of cortex_ingester_queried_chunks. #6398
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409

## 1.18.1 2024-10-14
Expand Down
116 changes: 112 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,117 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed
// Add default partition -> no label set configured working as a fallback when a series
// doesn't match any existing label set limit.
emptyLabels := labels.EmptyLabels()
defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels,
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 2,
},
}
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)

lbls = []string{labels.MetricName, "test_default"}
for i := 0; i < 2; i++ {
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API))
require.NoError(t, err)
}

// Max series limit for default partition is 2 so 1 more series will be throttled.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API))
httpResp, ok = httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, emptyLabels.String())

ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Add a new label set limit.
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet,
validation.LimitsPerLabelSet{LabelSet: labels.FromMap(map[string]string{
"series": "0",
}),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 3,
},
},
)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage reduced from 2 to 1 as one series in default partition
// now counted into the new partition.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 1
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 1
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed, keep default partition limit
limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2]
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage increased from 2 to 10 as some existing partitions got removed.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should persist between restarts
Expand All @@ -396,10 +491,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

Expand All @@ -420,6 +517,13 @@ func TestPushRace(t *testing.T) {
MaxSeries: 10e10,
},
},
{
// Default partition.
LabelSet: labels.EmptyLabels(),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 10e10,
},
},
}

dir := t.TempDir()
Expand Down Expand Up @@ -451,6 +555,10 @@ func TestPushRace(t *testing.T) {
defer wg.Done()
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)

// Go to default partition.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
}
Expand All @@ -472,13 +580,13 @@ func TestPushRace(t *testing.T) {
err = ir.Series(p.At(), &builder, nil)
require.NoError(t, err)
lbls := builder.Labels()
require.Equal(t, "foo", lbls.Get(labels.MetricName))
require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar")
require.Equal(t, "1", lbls.Get("userId"))
require.NotEmpty(t, lbls.Get("k"))
builder.Reset()
}
require.Equal(t, numberOfSeries, total)
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
require.Equal(t, 2*numberOfSeries, total)
require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries())
}

func TestIngesterUserLimitExceeded(t *testing.T) {
Expand Down
28 changes: 6 additions & 22 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
m := l.limitsPerLabelSets(userID, metric)
for _, limit := range m {
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error {
limits := l.limits.LimitsPerLabelSet(userID)
matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric)
for _, limit := range matchedLimits {
maxSeriesFunc := func(string) int {
return limit.Limits.MaxSeries
}
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
if u, err := f(limit); err != nil {
if u, err := f(limits, limit); err != nil {
return err
} else if u >= local {
return errMaxSeriesPerLabelSetLimitExceeded{
Expand Down Expand Up @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim

func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
m := l.limits.LimitsPerLabelSet(userID)

// returning early to not have any overhead
if len(m) == 0 {
return nil
}

r := make([]validation.LimitsPerLabelSet, 0, len(m))
outer:
for _, lbls := range m {
for _, lbl := range lbls.LabelSet {
// We did not find some of the labels on the set
if v := metric.Get(lbl.Name); v != lbl.Value {
continue outer
}
}
r = append(r, lbls)
}
return r
return validation.LimitsPerLabelSetsForSeries(m, metric)
}

func (l *Limiter) maxSeriesPerMetric(userID string) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
return testData.series, nil
})

Expand Down
Loading
Loading