diff --git a/pkg/costattribution/manager_test.go b/pkg/costattribution/manager_test.go index f4f085b0c4..bf111790e9 100644 --- a/pkg/costattribution/manager_test.go +++ b/pkg/costattribution/manager_test.go @@ -10,9 +10,9 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" + "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util/validation" ) @@ -76,9 +76,9 @@ func TestManager_CreateDeleteTracker(t *testing.T) { }) t.Run("Metrics tracking", func(t *testing.T) { - manager.Tracker("user1").IncrementDiscardedSamples(labels.FromStrings("team", "bar"), 1, "invalid-metrics-name", time.Unix(6, 0)) - manager.Tracker("user1").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(12, 0)) - manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("department", "foo", "service", "dodo"), 1, time.Unix(20, 0)) + manager.Tracker("user1").IncrementDiscardedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "bar"}}, 1, "invalid-metrics-name", time.Unix(6, 0)) + manager.Tracker("user1").IncrementDiscardedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "foo"}}, 1, "invalid-metrics-name", time.Unix(12, 0)) + manager.Tracker("user3").IncrementReceivedSamples([]mimirpb.LabelAdapter{{Name: "department", Value: "foo"}, {Name: "service", Value: "dodo"}}, 1, time.Unix(20, 0)) expectedMetrics := ` # HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution. @@ -126,7 +126,7 @@ func TestManager_CreateDeleteTracker(t *testing.T) { assert.Equal(t, 1, len(manager.trackersByUserID)) assert.True(t, manager.Tracker("user3").hasSameLabels([]string{"feature", "team"})) - manager.Tracker("user3").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(13, 0)) + manager.Tracker("user3").IncrementDiscardedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "foo"}}, 1, "invalid-metrics-name", time.Unix(13, 0)) expectedMetrics := ` # HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution. # TYPE cortex_discarded_attributed_samples_total counter @@ -136,9 +136,10 @@ func TestManager_CreateDeleteTracker(t *testing.T) { }) t.Run("Overflow metrics on cardinality limit", func(t *testing.T) { - manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("team", "bar", "feature", "bar"), 1, time.Unix(15, 0)) - manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("team", "baz", "feature", "baz"), 1, time.Unix(16, 0)) - manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("team", "foo", "feature", "foo"), 1, time.Unix(17, 0)) + + manager.Tracker("user3").IncrementReceivedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "bar"}, {Name: "feature", Value: "bar"}}, 1, time.Unix(15, 0)) + manager.Tracker("user3").IncrementReceivedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "baz"}, {Name: "feature", Value: "baz"}}, 1, time.Unix(16, 0)) + manager.Tracker("user3").IncrementReceivedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "foo"}, {Name: "feature", Value: "foo"}}, 1, time.Unix(17, 0)) expectedMetrics := ` # HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution. # TYPE cortex_received_attributed_samples_total counter @@ -151,9 +152,9 @@ func TestManager_CreateDeleteTracker(t *testing.T) { func TestManager_PurgeInactiveAttributionsUntil(t *testing.T) { manager := newTestManager() - manager.Tracker("user1").IncrementReceivedSamples(labels.FromStrings("team", "foo"), 1, time.Unix(1, 0)) - manager.Tracker("user1").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(1, 0)) - manager.Tracker("user3").IncrementDiscardedSamples(labels.FromStrings("department", "foo", "service", "bar"), 1, "out-of-window", time.Unix(10, 0)) + manager.Tracker("user1").IncrementReceivedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "foo"}}, 1, time.Unix(1, 0)) + manager.Tracker("user1").IncrementDiscardedSamples([]mimirpb.LabelAdapter{{Name: "team", Value: "foo"}}, 1, "invalid-metrics-name", time.Unix(1, 0)) + manager.Tracker("user3").IncrementDiscardedSamples([]mimirpb.LabelAdapter{{Name: "department", Value: "foo"}, {Name: "service", Value: "bar"}}, 1, "out-of-window", time.Unix(10, 0)) t.Run("Purge before inactive timeout", func(t *testing.T) { assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(0, 0).Unix())) diff --git a/pkg/costattribution/tracker.go b/pkg/costattribution/tracker.go index 84cd17ad5b..13ccf4a3d5 100644 --- a/pkg/costattribution/tracker.go +++ b/pkg/costattribution/tracker.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/mimir/pkg/mimirpb" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" @@ -164,18 +165,18 @@ func (t *Tracker) Collect(out chan<- prometheus.Metric) { } } -func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) { +func (t *Tracker) IncrementDiscardedSamples(lbs []mimirpb.LabelAdapter, value float64, reason string, now time.Time) { if t == nil { return } - t.updateCounters(lbs, now.Unix(), 0, 0, value, &reason, true) + t.updateCountersWithLabelAdapter(lbs, now.Unix(), 0, 0, value, &reason, true) } -func (t *Tracker) IncrementReceivedSamples(lbs labels.Labels, value float64, now time.Time) { +func (t *Tracker) IncrementReceivedSamples(lbs []mimirpb.LabelAdapter, value float64, now time.Time) { if t == nil { return } - t.updateCounters(lbs, now.Unix(), 0, value, 0, nil, true) + t.updateCountersWithLabelAdapter(lbs, now.Unix(), 0, value, 0, nil, true) } func (t *Tracker) IncrementActiveSeriesFailure(value float64) { @@ -185,23 +186,55 @@ func (t *Tracker) IncrementActiveSeriesFailure(value float64) { t.totalFailedActiveSeries.Add(value) } -func (t *Tracker) updateCounters(lbls labels.Labels, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string, createIfDoesNotExist bool) { - labelValues := make([]string, len(t.labels)) - lbls.Range(func(l labels.Label) { - if idx, ok := t.index[l.Name]; ok { - labelValues[idx] = l.Value +func (t *Tracker) updateCountersWithLabelAdapter(lbls []mimirpb.LabelAdapter, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string, createIfDoesNotExist bool) { + extractValues := func() []string { + labelValues := make([]string, len(t.labels)) + for _, l := range lbls { + if idx, ok := t.index[l.Name]; ok { + labelValues[idx] = l.Value + } } - }) + return labelValues + } + t.updateCountersCommon(extractValues, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason, createIfDoesNotExist) +} + +func (t *Tracker) updateCounters(lbls labels.Labels, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string, createIfDoesNotExist bool) { + extractValues := func() []string { + labelValues := make([]string, len(t.labels)) + lbls.Range(func(l labels.Label) { + if idx, ok := t.index[l.Name]; ok { + labelValues[idx] = l.Value + } + }) + return labelValues + } + t.updateCountersCommon(extractValues, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason, createIfDoesNotExist) +} + +func (t *Tracker) updateCountersCommon( + extractValues func() []string, + ts int64, + activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, + reason *string, + createIfDoesNotExist bool, +) { + // Extract label values + labelValues := extractValues() + + // Fill missing label values for i := 0; i < len(labelValues); i++ { if labelValues[i] == "" { labelValues[i] = missingValue } } + // Reuse buffer from pool for building the observation key buf := bufferPool.Get().(*bytes.Buffer) buf.Reset() defer bufferPool.Put(buf) - // Build the observation key + + // Construct the observation key by joining label values for i, value := range labelValues { if i > 0 { buf.WriteRune(sep) @@ -209,9 +242,11 @@ func (t *Tracker) updateCounters(lbls labels.Labels, ts int64, activeSeriesIncre buf.WriteString(value) } + // Lock access to the observation map t.observedMtx.Lock() defer t.observedMtx.Unlock() + // Update observations and state t.updateObservations(buf.Bytes(), ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason, createIfDoesNotExist) t.updateState(ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement) } diff --git a/pkg/costattribution/tracker_test.go b/pkg/costattribution/tracker_test.go index 1172863ecd..025d0ad96a 100644 --- a/pkg/costattribution/tracker_test.go +++ b/pkg/costattribution/tracker_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/grafana/mimir/pkg/mimirpb" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" @@ -31,8 +32,8 @@ func TestTracker_CreateDelete(t *testing.T) { tracker.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), time.Unix(1, 0)) tracker.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "2"), time.Unix(2, 0)) tracker.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3")) - tracker.IncrementReceivedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 5, time.Unix(4, 0)) - tracker.IncrementDiscardedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 2, "sample-out-of-order", time.Unix(4, 0)) + tracker.IncrementReceivedSamples([]mimirpb.LabelAdapter{{Name: "platform", Value: "foo"}, {Name: "team", Value: "1"}}, 5, time.Unix(4, 0)) + tracker.IncrementDiscardedSamples([]mimirpb.LabelAdapter{{Name: "platform", Value: "foo"}, {Name: "team", Value: "1"}}, 2, "sample-out-of-order", time.Unix(4, 0)) tracker.IncrementActiveSeries(labels.FromStrings("platform", "bar", "tenant", "user4", "team", "2"), time.Unix(6, 0)) tracker.IncrementActiveSeriesFailure(1) @@ -101,10 +102,10 @@ func TestTracker_inactiveObservations(t *testing.T) { tracker := newTestManager().Tracker("user1") // Create two observations with different last update timestamps. - observations := []labels.Labels{ - labels.FromStrings("team", "foo"), - labels.FromStrings("team", "bar"), - labels.FromStrings("team", "baz"), + observations := [][]mimirpb.LabelAdapter{ + {{Name: "team", Value: "foo"}}, + {{Name: "team", Value: "bar"}}, + {{Name: "team", Value: "baz"}}, } // Simulate samples discarded with different timestamps. tracker.IncrementDiscardedSamples(observations[0], 1, "invalid-metrics-name", time.Unix(1, 0)) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index c6aea2a7dd..627aa6d1b5 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -988,7 +988,7 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { if errors.As(err, &tooManyClustersError{}) { d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples)) - d.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, now) + d.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(req.Timeseries[0].Labels, float64(numSamples), reasonTooManyHAClusters, now) } return err @@ -1247,7 +1247,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc { totalN := validatedSamples + validatedExemplars + validatedMetadata if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { if len(req.Timeseries) > 0 { - d.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(validatedSamples), reasonRateLimited, now) + d.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(req.Timeseries[0].Labels, float64(validatedSamples), reasonRateLimited, now) } d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples)) d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars)) @@ -1832,7 +1832,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) - d.costAttributionMgr.Tracker(userID).IncrementReceivedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(receivedSamples), mtime.Now()) + d.costAttributionMgr.Tracker(userID).IncrementReceivedSamples(ts.Labels, float64(receivedSamples), mtime.Now()) } receivedMetadata = len(req.Metadata) diff --git a/pkg/distributor/validate.go b/pkg/distributor/validate.go index 5b6775cdf9..d48f91ff8b 100644 --- a/pkg/distributor/validate.go +++ b/pkg/distributor/validate.go @@ -242,14 +242,14 @@ func newExemplarValidationMetrics(r prometheus.Registerer) *exemplarValidationMe func validateSample(m *sampleValidationMetrics, now model.Time, cfg sampleValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, s mimirpb.Sample, cat *costattribution.Tracker) error { if model.Time(s.TimestampMs) > now.Add(cfg.CreationGracePeriod(userID)) { m.tooFarInFuture.WithLabelValues(userID, group).Inc() - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInFuture, now.Time()) + cat.IncrementDiscardedSamples(ls, 1, reasonTooFarInFuture, now.Time()) unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return fmt.Errorf(sampleTimestampTooNewMsgFormat, s.TimestampMs, unsafeMetricName) } if cfg.PastGracePeriod(userID) > 0 && model.Time(s.TimestampMs) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) { m.tooFarInPast.WithLabelValues(userID, group).Inc() - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInPast, now.Time()) + cat.IncrementDiscardedSamples(ls, 1, reasonTooFarInPast, now.Time()) unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return fmt.Errorf(sampleTimestampTooOldMsgFormat, s.TimestampMs, unsafeMetricName) } @@ -262,21 +262,21 @@ func validateSample(m *sampleValidationMetrics, now model.Time, cfg sampleValida // It uses the passed 'now' time to measure the relative time of the sample. func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sampleValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, s *mimirpb.Histogram, cat *costattribution.Tracker) (bool, error) { if model.Time(s.Timestamp) > now.Add(cfg.CreationGracePeriod(userID)) { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInFuture, now.Time()) + cat.IncrementDiscardedSamples(ls, 1, reasonTooFarInFuture, now.Time()) m.tooFarInFuture.WithLabelValues(userID, group).Inc() unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return false, fmt.Errorf(sampleTimestampTooNewMsgFormat, s.Timestamp, unsafeMetricName) } if cfg.PastGracePeriod(userID) > 0 && model.Time(s.Timestamp) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInPast, now.Time()) + cat.IncrementDiscardedSamples(ls, 1, reasonTooFarInPast, now.Time()) m.tooFarInPast.WithLabelValues(userID, group).Inc() unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return false, fmt.Errorf(sampleTimestampTooOldMsgFormat, s.Timestamp, unsafeMetricName) } if s.Schema < mimirpb.MinimumHistogramSchema || s.Schema > mimirpb.MaximumHistogramSchema { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidNativeHistogramSchema, now.Time()) + cat.IncrementDiscardedSamples(ls, 1, reasonInvalidNativeHistogramSchema, now.Time()) m.invalidNativeHistogramSchema.WithLabelValues(userID, group).Inc() return false, fmt.Errorf(invalidSchemaNativeHistogramMsgFormat, s.Schema) } @@ -290,7 +290,7 @@ func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sam } if bucketCount > bucketLimit { if !cfg.ReduceNativeHistogramOverMaxBuckets(userID) { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMaxNativeHistogramBuckets, now.Time()) + cat.IncrementDiscardedSamples(ls, 1, reasonMaxNativeHistogramBuckets, now.Time()) m.maxNativeHistogramBuckets.WithLabelValues(userID, group).Inc() return false, fmt.Errorf(maxNativeHistogramBucketsMsgFormat, s.Timestamp, mimirpb.FromLabelAdaptersToString(ls), bucketCount, bucketLimit) } @@ -298,7 +298,7 @@ func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sam for { bc, err := s.ReduceResolution() if err != nil { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMaxNativeHistogramBuckets, now.Time()) + cat.IncrementDiscardedSamples(ls, 1, reasonMaxNativeHistogramBuckets, now.Time()) m.maxNativeHistogramBuckets.WithLabelValues(userID, group).Inc() return false, fmt.Errorf(notReducibleNativeHistogramMsgFormat, s.Timestamp, mimirpb.FromLabelAdaptersToString(ls), bucketCount, bucketLimit) } @@ -403,13 +403,13 @@ func removeNonASCIIChars(in string) (out string) { func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, skipLabelValidation, skipLabelCountValidation bool, cat *costattribution.Tracker, ts time.Time) error { unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls) if err != nil { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMissingMetricName, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonMissingMetricName, ts) m.missingMetricName.WithLabelValues(userID, group).Inc() return errors.New(noMetricNameMsgFormat) } if !model.IsValidMetricName(model.LabelValue(unsafeMetricName)) { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidMetricName, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonInvalidMetricName, ts) m.invalidMetricName.WithLabelValues(userID, group).Inc() return fmt.Errorf(invalidMetricNameMsgFormat, removeNonASCIIChars(unsafeMetricName)) } @@ -418,13 +418,13 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI if strings.HasSuffix(unsafeMetricName, "_info") { if len(ls) > cfg.MaxLabelNamesPerInfoSeries(userID) { m.maxLabelNamesPerInfoSeries.WithLabelValues(userID, group).Inc() - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMaxLabelNamesPerInfoSeries, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonMaxLabelNamesPerInfoSeries, ts) metric, ellipsis := getMetricAndEllipsis(ls) return fmt.Errorf(tooManyInfoLabelsMsgFormat, len(ls), cfg.MaxLabelNamesPerInfoSeries(userID), metric, ellipsis) } } else { m.maxLabelNamesPerSeries.WithLabelValues(userID, group).Inc() - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMaxLabelNamesPerSeries, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonMaxLabelNamesPerSeries, ts) metric, ellipsis := getMetricAndEllipsis(ls) return fmt.Errorf(tooManyLabelsMsgFormat, len(ls), cfg.MaxLabelNamesPerSeries(userID), metric, ellipsis) } @@ -436,22 +436,22 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI for _, l := range ls { if !skipLabelValidation && !model.LabelName(l.Name).IsValid() { m.invalidLabel.WithLabelValues(userID, group).Inc() - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidLabel, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonInvalidLabel, ts) return fmt.Errorf(invalidLabelMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } else if len(l.Name) > maxLabelNameLength { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonLabelNameTooLong, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonLabelNameTooLong, ts) m.labelNameTooLong.WithLabelValues(userID, group).Inc() return fmt.Errorf(labelNameTooLongMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } else if !skipLabelValidation && !model.LabelValue(l.Value).IsValid() { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidLabelValue, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonInvalidLabelValue, ts) m.invalidLabelValue.WithLabelValues(userID, group).Inc() return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, strings.ToValidUTF8(l.Value, ""), unsafeMetricName) } else if len(l.Value) > maxLabelValueLength { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonLabelValueTooLong, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonLabelValueTooLong, ts) m.labelValueTooLong.WithLabelValues(userID, group).Inc() return fmt.Errorf(labelValueTooLongMsgFormat, l.Name, l.Value, mimirpb.FromLabelAdaptersToString(ls)) } else if lastLabelName == l.Name { - cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonDuplicateLabelNames, ts) + cat.IncrementDiscardedSamples(ls, 1, reasonDuplicateLabelNames, ts) m.duplicateLabelNames.WithLabelValues(userID, group).Inc() return fmt.Errorf(duplicateLabelMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index de6f02a53a..2291981bd2 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1202,56 +1202,56 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques }, func(timestamp int64, labels []mimirpb.LabelAdapter) { stats.sampleTimestampTooOldCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTimestampTooOld, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonSampleTimestampTooOld, startAppend) updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError { return newSampleTimestampTooOldError(model.Time(timestamp), labels) }) }, func(timestamp int64, labels []mimirpb.LabelAdapter) { stats.sampleOutOfOrderCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfOrder, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonSampleOutOfOrder, startAppend) updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() softError { return newSampleOutOfOrderError(model.Time(timestamp), labels) }) }, func(timestamp int64, labels []mimirpb.LabelAdapter) { stats.sampleTooOldCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooOld, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonSampleTooOld, startAppend) updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() softError { return newSampleTimestampTooOldOOOEnabledError(model.Time(timestamp), labels, outOfOrderWindow) }) }, func(timestamp int64, labels []mimirpb.LabelAdapter) { stats.sampleTooFarInFutureCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooFarInFuture, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonSampleTooFarInFuture, startAppend) updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() softError { return newSampleTimestampTooFarInFutureError(model.Time(timestamp), labels) }) }, func(timestamp int64, labels []mimirpb.LabelAdapter) { stats.newValueForTimestampCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonNewValueForTimestamp, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonNewValueForTimestamp, startAppend) updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() softError { return newSampleDuplicateTimestampError(model.Time(timestamp), labels) }) }, func(labels []mimirpb.LabelAdapter) { stats.perUserSeriesLimitCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerUserSeriesLimit, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonPerUserSeriesLimit, startAppend) updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() softError { return newPerUserSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerUser(userID)) }) }, func(labels []mimirpb.LabelAdapter) { stats.perMetricSeriesLimitCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerMetricSeriesLimit, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonPerMetricSeriesLimit, startAppend) updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() softError { return newPerMetricSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerMetric(userID), labels) }) }, func(err error, timestamp int64, labels []mimirpb.LabelAdapter) { stats.sampleOutOfOrderCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfOrder, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonSampleOutOfOrder, startAppend) updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { e := newNativeHistogramValidationError(globalerror.NativeHistogramOOODisabled, err, model.Time(timestamp), labels) return e @@ -1259,35 +1259,35 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques }, func(err error, timestamp int64, labels []mimirpb.LabelAdapter) { stats.invalidNativeHistogramCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonInvalidNativeHistogram, startAppend) updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramCountMismatch, err, model.Time(timestamp), labels) }) }, func(err error, timestamp int64, labels []mimirpb.LabelAdapter) { stats.invalidNativeHistogramCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonInvalidNativeHistogram, startAppend) updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramCountNotBigEnough, err, model.Time(timestamp), labels) }) }, func(err error, timestamp int64, labels []mimirpb.LabelAdapter) { stats.invalidNativeHistogramCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonInvalidNativeHistogram, startAppend) updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramNegativeBucketCount, err, model.Time(timestamp), labels) }) }, func(err error, timestamp int64, labels []mimirpb.LabelAdapter) { stats.invalidNativeHistogramCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonInvalidNativeHistogram, startAppend) updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramSpanNegativeOffset, err, model.Time(timestamp), labels) }) }, func(err error, timestamp int64, labels []mimirpb.LabelAdapter) { stats.invalidNativeHistogramCount++ - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(labels, 1, reasonInvalidNativeHistogram, startAppend) updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramSpansBucketsMismatch, err, model.Time(timestamp), labels) }) @@ -1437,7 +1437,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats.failedSamplesCount += len(ts.Samples) + len(ts.Histograms) stats.sampleTimestampTooOldCount += len(ts.Samples) + len(ts.Histograms) - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), reasonSampleTimestampTooOld, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(ts.Labels, float64(len(ts.Samples)+len(ts.Histograms)), reasonSampleTimestampTooOld, startAppend) var firstTimestamp int64 if len(ts.Samples) > 0 { firstTimestamp = ts.Samples[0].TimestampMs @@ -1457,7 +1457,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre len(ts.Samples) > 0 && allOutOfBoundsFloats(ts.Samples, minAppendTime) { stats.failedSamplesCount += len(ts.Samples) stats.sampleTimestampTooOldCount += len(ts.Samples) - i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)), reasonSampleTimestampTooOld, startAppend) + i.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(ts.Labels, float64(len(ts.Samples)), reasonSampleTimestampTooOld, startAppend) firstTimestamp := ts.Samples[0].TimestampMs updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError {