Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from cortexproject:master #659

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay. #6277
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271
* [BUGFIX] Ingester: Fix regression on usage of cortex_ingester_queried_chunks. #6398
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409

## 1.18.1 2024-10-14
Expand Down
4 changes: 3 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5770,7 +5770,9 @@ limits:
# would not enforce any limits.
[max_series: <int> | default = ]

# LabelSet which the limit should be applied.
# LabelSet which the limit should be applied. If no labels are provided, it
# becomes the default partition which matches any series that doesn't match any
# other explicitly defined label sets.'
[label_set: <map of string (labelName) to string (labelValue)> | default = []]
```

Expand Down
2 changes: 1 addition & 1 deletion docs/guides/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ In order to send traces, you will need to set up an OpenTelemetry Collector. The
multiple destinations such as [AWS X-Ray](https://aws-otel.github.io/docs/getting-started/x-ray),
[Google Cloud](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/googlecloudexporter),
[DataDog](https://docs.datadoghq.com/tracing/trace_collection/open_standards/otel_collector_datadog_exporter/) and
[others(https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter). OpenTelemetry Collector
[others](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter). OpenTelemetry Collector
provides a [helm chart](https://github.com/open-telemetry/opentelemetry-helm-charts/tree/main/charts/opentelemetry-collector/examples/deployment-otlp-traces)
to set up the environment.

Expand Down
17 changes: 10 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ type Ingester struct {
TSDBState TSDBState

// Rate of pushed samples. Only used by V2-ingester to limit global samples push rate.
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
maxInflightPushRequests util_math.MaxTracker

inflightQueryRequests atomic.Int64
maxInflightQueryRequests util_math.MaxTracker
Expand Down Expand Up @@ -710,7 +711,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
cfg.ActiveSeriesMetricsEnabled,
i.getInstanceLimits,
i.ingestionRate,
&i.inflightPushRequests,
&i.maxInflightPushRequests,
&i.maxInflightQueryRequests,
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled)
i.validateMetrics = validation.NewValidateMetrics(registerer)
Expand Down Expand Up @@ -792,7 +793,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
false,
i.getInstanceLimits,
nil,
&i.inflightPushRequests,
&i.maxInflightPushRequests,
&i.maxInflightQueryRequests,
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled,
)
Expand Down Expand Up @@ -918,8 +919,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
metadataPurgeTicker := time.NewTicker(metadataPurgePeriod)
defer metadataPurgeTicker.Stop()

maxInflightRequestResetTicker := time.NewTicker(maxInflightRequestResetPeriod)
defer maxInflightRequestResetTicker.Stop()
maxTrackerResetTicker := time.NewTicker(maxInflightRequestResetPeriod)
defer maxTrackerResetTicker.Stop()

for {
select {
Expand All @@ -937,8 +938,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {

case <-activeSeriesTickerChan:
i.updateActiveSeries(ctx)
case <-maxInflightRequestResetTicker.C:
case <-maxTrackerResetTicker.C:
i.maxInflightQueryRequests.Tick()
i.maxInflightPushRequests.Tick()
case <-userTSDBConfigTicker.C:
i.updateUserTSDBConfigs()
case <-ctx.Done():
Expand Down Expand Up @@ -1068,6 +1070,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// We will report *this* request in the error too.
inflight := i.inflightPushRequests.Inc()
i.maxInflightPushRequests.Track(inflight)
defer i.inflightPushRequests.Dec()

gl := i.getInstanceLimits()
Expand Down
120 changes: 114 additions & 6 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e

serieses = append(serieses, &storage.SeriesEntry{
Lset: ls,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return batch.NewChunkMergeIterator(chunks, math.MinInt64, math.MaxInt64)
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return batch.NewChunkMergeIterator(it, chunks, math.MinInt64, math.MaxInt64)
},
})
}
Expand Down Expand Up @@ -366,22 +366,117 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed
// Add default partition -> no label set configured working as a fallback when a series
// doesn't match any existing label set limit.
emptyLabels := labels.EmptyLabels()
defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels,
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 2,
},
}
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)

lbls = []string{labels.MetricName, "test_default"}
for i := 0; i < 2; i++ {
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API))
require.NoError(t, err)
}

// Max series limit for default partition is 2 so 1 more series will be throttled.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API))
httpResp, ok = httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, emptyLabels.String())

ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Add a new label set limit.
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet,
validation.LimitsPerLabelSet{LabelSet: labels.FromMap(map[string]string{
"series": "0",
}),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 3,
},
},
)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage reduced from 2 to 1 as one series in default partition
// now counted into the new partition.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 1
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 1
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should remove metrics when the limits is removed, keep default partition limit
limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2]
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
b, err = json.Marshal(limits)
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries(ctx)
// Default partition usage increased from 2 to 10 as some existing partitions got removed.
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))

// Should persist between restarts
Expand All @@ -396,10 +491,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
# TYPE cortex_ingester_limits_per_labelset gauge
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
# TYPE cortex_ingester_usage_per_labelset gauge
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

Expand All @@ -420,6 +517,13 @@ func TestPushRace(t *testing.T) {
MaxSeries: 10e10,
},
},
{
// Default partition.
LabelSet: labels.EmptyLabels(),
Limits: validation.LimitsPerLabelSetEntry{
MaxSeries: 10e10,
},
},
}

dir := t.TempDir()
Expand Down Expand Up @@ -451,6 +555,10 @@ func TestPushRace(t *testing.T) {
defer wg.Done()
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)

// Go to default partition.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
}
Expand All @@ -472,13 +580,13 @@ func TestPushRace(t *testing.T) {
err = ir.Series(p.At(), &builder, nil)
require.NoError(t, err)
lbls := builder.Labels()
require.Equal(t, "foo", lbls.Get(labels.MetricName))
require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar")
require.Equal(t, "1", lbls.Get("userId"))
require.NotEmpty(t, lbls.Get("k"))
builder.Reset()
}
require.Equal(t, numberOfSeries, total)
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
require.Equal(t, 2*numberOfSeries, total)
require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries())
}

func TestIngesterUserLimitExceeded(t *testing.T) {
Expand Down
28 changes: 6 additions & 22 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
m := l.limitsPerLabelSets(userID, metric)
for _, limit := range m {
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error {
limits := l.limits.LimitsPerLabelSet(userID)
matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric)
for _, limit := range matchedLimits {
maxSeriesFunc := func(string) int {
return limit.Limits.MaxSeries
}
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
if u, err := f(limit); err != nil {
if u, err := f(limits, limit); err != nil {
return err
} else if u >= local {
return errMaxSeriesPerLabelSetLimitExceeded{
Expand Down Expand Up @@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim

func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
m := l.limits.LimitsPerLabelSet(userID)

// returning early to not have any overhead
if len(m) == 0 {
return nil
}

r := make([]validation.LimitsPerLabelSet, 0, len(m))
outer:
for _, lbls := range m {
for _, lbl := range lbls.LabelSet {
// We did not find some of the labels on the set
if v := metric.Get(lbl.Name); v != lbl.Value {
continue outer
}
}
r = append(r, lbls)
}
return r
return validation.LimitsPerLabelSetsForSeries(m, metric)
}

func (l *Limiter) maxSeriesPerMetric(userID string) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
return testData.series, nil
})

Expand Down
11 changes: 6 additions & 5 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -53,10 +52,12 @@ type ingesterMetrics struct {
maxUsersGauge prometheus.GaugeFunc
maxSeriesGauge prometheus.GaugeFunc
maxIngestionRate prometheus.GaugeFunc
ingestionRate prometheus.GaugeFunc
maxInflightPushRequests prometheus.GaugeFunc
inflightRequests prometheus.GaugeFunc
inflightQueryRequests prometheus.GaugeFunc

// Current Usage
ingestionRate prometheus.GaugeFunc
inflightRequests prometheus.GaugeFunc
inflightQueryRequests prometheus.GaugeFunc

// Posting Cache Metrics
expandedPostingsCacheMetrics *tsdb.ExpandedPostingsCacheMetrics
Expand All @@ -67,7 +68,7 @@ func newIngesterMetrics(r prometheus.Registerer,
activeSeriesEnabled bool,
instanceLimitsFn func() *InstanceLimits,
ingestionRate *util_math.EwmaRate,
inflightPushRequests *atomic.Int64,
inflightPushRequests *util_math.MaxTracker,
maxInflightQueryRequests *util_math.MaxTracker,
postingsCacheEnabled bool,
) *ingesterMetrics {
Expand Down
Loading
Loading