Skip to content

Commit

Permalink
Create a metric to track users with newlines in ingested label values (
Browse files Browse the repository at this point in the history
…#9400)

* Create a metric to track users with newlines in ingested label values

Signed-off-by: Oleg Zaytsev <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Oleg Zaytsev <[email protected]>

* Only increase metric when >0

Signed-off-by: Oleg Zaytsev <[email protected]>

* s/amount/number/

Signed-off-by: Oleg Zaytsev <[email protected]>

* Fix test

Signed-off-by: Oleg Zaytsev <[email protected]>

---------

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega authored Sep 25, 2024
1 parent 09719b5 commit 82c47c8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
* [ENHANCEMENT] Query-scheduler: Experimental `-query-scheduler.prioritize-query-components` flag enables the querier-worker queue priority algorithm to take precedence over tenant rotation when dequeuing requests. #9220
* [ENHANCEMENT] Add application credential arguments for Openstack Swift storage backend. #9181
* [ENHANCEMENT] Ruler: Support `exclude_alerts` parameter in `<prometheus-http-prefix>/api/v1/rules` endpoint. #9300
* [ENHANCEMENT] Distributor: add a metric to track tenants who are sending newlines in their label values called `cortex_distributor_label_values_with_newlines_total`. #9400
* [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346
* [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388
* [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391
Expand Down
20 changes: 20 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type Distributor struct {
incomingSamplesPerRequest *prometheus.HistogramVec
incomingExemplarsPerRequest *prometheus.HistogramVec
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
labelValuesWithNewlinesPerUser *prometheus.CounterVec
hashCollisionCount prometheus.Counter

// Metrics for data rejected for hitting per-tenant limits
Expand Down Expand Up @@ -412,6 +413,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
}, []string{"user"}),
labelValuesWithNewlinesPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_label_values_with_newlines_total",
Help: "Total number of label values with newlines seen at ingestion time.",
}, []string{"user"}),

discardedSamplesTooManyHaClusters: validation.DiscardedSamplesCounter(reg, reasonTooManyHAClusters),
discardedSamplesRateLimited: validation.DiscardedSamplesCounter(reg, reasonRateLimited),
Expand Down Expand Up @@ -651,6 +656,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.incomingExemplarsPerRequest.DeleteLabelValues(userID)
d.nonHASamples.DeleteLabelValues(userID)
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)
d.labelValuesWithNewlinesPerUser.DeleteLabelValues(userID)

d.PushMetrics.deleteUserMetrics(userID)

Expand Down Expand Up @@ -772,6 +778,15 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
}
return nil
}
func (d *Distributor) labelValuesWithNewlines(labels []mimirpb.LabelAdapter) int {
count := 0
for _, l := range labels {
if strings.IndexByte(l.Value, '\n') >= 0 {
count++
}
}
return count
}

// wrapPushWithMiddlewares returns push function wrapped in all Distributor's middlewares.
// push wrappers will be applied to incoming requests in the order in which they are in the slice in the config struct.
Expand Down Expand Up @@ -1026,6 +1041,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
var removeIndexes []int
totalSamples, totalExemplars := 0, 0

labelValuesWithNewlines := 0
for tsIdx, ts := range req.Timeseries {
totalSamples += len(ts.Samples)
totalExemplars += len(ts.Exemplars)
Expand Down Expand Up @@ -1053,10 +1069,14 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {

validatedSamples += len(ts.Samples) + len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
labelValuesWithNewlines += d.labelValuesWithNewlines(ts.Labels)
}

d.incomingSamplesPerRequest.WithLabelValues(userID).Observe(float64(totalSamples))
d.incomingExemplarsPerRequest.WithLabelValues(userID).Observe(float64(totalExemplars))
if labelValuesWithNewlines > 0 {
d.labelValuesWithNewlinesPerUser.WithLabelValues(userID).Add(float64(labelValuesWithNewlines))
}

if len(removeIndexes) > 0 {
for _, removeIndex := range removeIndexes {
Expand Down
9 changes: 9 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_metadata_in_total",
"cortex_distributor_non_ha_samples_received_total",
"cortex_distributor_latest_seen_sample_timestamp_seconds",
"cortex_distributor_label_values_with_newlines_total",
}

d.receivedSamples.WithLabelValues("userA").Add(5)
Expand All @@ -374,6 +375,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.nonHASamples.WithLabelValues("userA").Add(5)
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)
d.labelValuesWithNewlinesPerUser.WithLabelValues("userA").Inc()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
Expand Down Expand Up @@ -414,6 +416,10 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter
cortex_distributor_exemplars_in_total{user="userA"} 5
# HELP cortex_distributor_label_values_with_newlines_total Total number of label values with newlines seen at ingestion time.
# TYPE cortex_distributor_label_values_with_newlines_total counter
cortex_distributor_label_values_with_newlines_total{user="userA"} 1
`), metrics...))

d.cleanupInactiveUser("userA")
Expand Down Expand Up @@ -448,6 +454,9 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter
# HELP cortex_distributor_label_values_with_newlines_total Total number of label values with newlines seen at ingestion time.
# TYPE cortex_distributor_label_values_with_newlines_total counter
`), metrics...))
}

Expand Down

0 comments on commit 82c47c8

Please sign in to comment.