From 6dcd7a3d36afc3814981b4fa97f07e059ec116b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90urica=20Yuri=20Nikoli=C4=87?= Date: Thu, 12 Dec 2024 11:33:05 +0100 Subject: [PATCH] Distributor: remove samples with duplicated timestamps within a single timeseries (#10145) * Distributor: remove samples with duplicated timestamps within a single timeseries Signed-off-by: Yuri Nikolic * Making lint happy Signed-off-by: Yuri Nikolic * Include removal of samples with duplicated timestamp in validateSeries() Signed-off-by: Yuri Nikolic * Making lint happy Signed-off-by: Yuri Nikolic * Adding benchmarks Signed-off-by: Yuri Nikolic * Making lint happy Signed-off-by: Yuri Nikolic * Improve sample removal Signed-off-by: Yuri Nikolic * Improve benchmarks Signed-off-by: Yuri Nikolic * Fixing review findings Signed-off-by: Yuri Nikolic * Fixing tests Signed-off-by: Yuri Nikolic * Fixing benchmarks Signed-off-by: Yuri Nikolic * Fixing benchmarks Signed-off-by: Yuri Nikolic * Fixing review findings Signed-off-by: Yuri Nikolic * Remove big benchmarks * Use RemoveSliceIndexes Signed-off-by: Yuri Nikolic * Initialize removeIndexes within validateXxx methods Signed-off-by: Yuri Nikolic * Copy element by element Signed-off-by: Yuri Nikolic * Replace h with ts.Histogram[idx] Signed-off-by: Yuri Nikolic * Fix validateHistogram() when size is 1 Signed-off-by: Yuri Nikolic * Fixing review findings Signed-off-by: Yuri Nikolic --------- Signed-off-by: Yuri Nikolic --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 124 ++++++- .../distributor_ingest_storage_test.go | 38 +-- pkg/distributor/distributor_test.go | 319 +++++++++++++++++- pkg/distributor/query_test.go | 16 +- pkg/distributor/validate.go | 6 + pkg/mimirpb/timeseries.go | 4 + 7 files changed, 454 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10ed08b8919..8710c1a2aa6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## main / unreleased * [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168 +* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145 ### Grafana Mimir diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b17537465bd..bd4c3b3ffe8 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -53,6 +53,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/ingest" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/extract" "github.com/grafana/mimir/pkg/util/globalerror" mimir_limiter "github.com/grafana/mimir/pkg/util/limiter" util_math "github.com/grafana/mimir/pkg/util/math" @@ -737,40 +738,102 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica return true, nil } -// Validates a single series from a write request. +// validateSamples validates samples of a single timeseries and removes the ones with duplicated timestamps. +// Returns an error explaining the first validation finding. // May alter timeseries data in-place. // The returned error may retain the series labels. -// It uses the passed nowt time to observe the delay of sample timestamps. -func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelValidation, skipLabelCountValidation bool, minExemplarTS, maxExemplarTS int64) error { - if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation); err != nil { - return err +func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error { + if len(ts.Samples) == 0 { + return nil } - now := model.TimeFromUnixNano(nowt.UnixNano()) + if len(ts.Samples) == 1 { + return validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, ts.Samples[0]) + } + timestamps := make(map[int64]struct{}, min(len(ts.Samples), 100)) + currPos := 0 + duplicatesFound := false for _, s := range ts.Samples { + if _, ok := timestamps[s.TimestampMs]; ok { + // A sample with the same timestamp has already been validated, so we skip it. + duplicatesFound = true + continue + } + + timestamps[s.TimestampMs] = struct{}{} if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s); err != nil { return err } + + ts.Samples[currPos] = s + currPos++ + } + + if duplicatesFound { + ts.Samples = ts.Samples[:currPos] + ts.SamplesUpdated() } + return nil +} + +// validateHistograms validates histograms of a single timeseries and removes the ones with duplicated timestamps. +// Returns an error explaining the first validation finding. +// May alter timeseries data in-place. +// The returned error may retain the series labels. +func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error { + if len(ts.Histograms) == 0 { + return nil + } + + if len(ts.Histograms) == 1 { + updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[0]) + if err != nil { + return err + } + if updated { + ts.HistogramsUpdated() + } + return nil + } + + timestamps := make(map[int64]struct{}, min(len(ts.Histograms), 100)) + currPos := 0 histogramsUpdated := false - for i := range ts.Histograms { - updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[i]) + for idx := range ts.Histograms { + if _, ok := timestamps[ts.Histograms[idx].Timestamp]; ok { + // A sample with the same timestamp has already been validated, so we skip it. + histogramsUpdated = true + continue + } + + timestamps[ts.Histograms[idx].Timestamp] = struct{}{} + updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[idx]) if err != nil { return err } histogramsUpdated = histogramsUpdated || updated + + ts.Histograms[currPos] = ts.Histograms[idx] + currPos++ } + if histogramsUpdated { + ts.Histograms = ts.Histograms[:currPos] ts.HistogramsUpdated() } + return nil +} + +// validateExemplars validates exemplars of a single timeseries. +// May alter timeseries data in-place. +func (d *Distributor) validateExemplars(ts *mimirpb.PreallocTimeseries, userID string, minExemplarTS, maxExemplarTS int64) { if d.limits.MaxGlobalExemplarsPerUser(userID) == 0 { ts.ClearExemplars() - return nil + return } - allowedExemplars := d.limits.MaxExemplarsPerSeriesPerRequest(userID) if allowedExemplars > 0 && len(ts.Exemplars) > allowedExemplars { d.exemplarValidationMetrics.tooManyExemplars.WithLabelValues(userID).Add(float64(len(ts.Exemplars) - allowedExemplars)) @@ -802,7 +865,40 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser if !isInOrder { ts.SortExemplars() } - return nil +} + +// Validates a single series from a write request. +// May alter timeseries data in-place. +// Returns a boolean stating if the timeseries should be removed from the request, and an error explaining the first validation finding. +// The returned error may retain the series labels. +// It uses the passed nowt time to observe the delay of sample timestamps. +func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelValidation, skipLabelCountValidation bool, minExemplarTS, maxExemplarTS int64) (bool, error) { + if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation); err != nil { + return true, err + } + + now := model.TimeFromUnixNano(nowt.UnixNano()) + totalSamplesAndHistograms := len(ts.Samples) + len(ts.Histograms) + + if err := d.validateSamples(now, ts, userID, group); err != nil { + return true, err + } + + if err := d.validateHistograms(now, ts, userID, group); err != nil { + return true, err + } + + d.validateExemplars(ts, userID, minExemplarTS, maxExemplarTS) + + deduplicatedSamplesAndHistograms := totalSamplesAndHistograms - len(ts.Samples) - len(ts.Histograms) + + if deduplicatedSamplesAndHistograms > 0 { + d.sampleValidationMetrics.duplicateTimestamp.WithLabelValues(userID, group).Add(float64(deduplicatedSamplesAndHistograms)) + unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ts.Labels) + return false, fmt.Errorf(duplicateTimestampMsgFormat, deduplicatedSamplesAndHistograms, unsafeMetricName) + } + + return false, nil } func (d *Distributor) labelValuesWithNewlines(labels []mimirpb.LabelAdapter) int { count := 0 @@ -1084,7 +1180,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc { skipLabelCountValidation := d.cfg.SkipLabelCountValidation || req.GetSkipLabelCountValidation() // Note that validateSeries may drop some data in ts. - validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelValidation, skipLabelCountValidation, minExemplarTS, maxExemplarTS) + shouldRemove, validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelValidation, skipLabelCountValidation, minExemplarTS, maxExemplarTS) // Errors in validation are considered non-fatal, as one series in a request may contain // invalid data but all the remaining series could be perfectly valid. @@ -1093,7 +1189,9 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc { // The series are never retained by validationErr. This is guaranteed by the way the latter is built. firstPartialErr = newValidationError(validationErr) } - removeIndexes = append(removeIndexes, tsIdx) + if shouldRemove { + removeIndexes = append(removeIndexes, tsIdx) + } continue } diff --git a/pkg/distributor/distributor_ingest_storage_test.go b/pkg/distributor/distributor_ingest_storage_test.go index 3fb0274792f..ec0a99e2633 100644 --- a/pkg/distributor/distributor_ingest_storage_test.go +++ b/pkg/distributor/distributor_ingest_storage_test.go @@ -59,11 +59,11 @@ func TestDistributor_Push_ShouldSupportIngestStorage(t *testing.T) { createRequest := func() *mimirpb.WriteRequest { return &mimirpb.WriteRequest{ Timeseries: []mimirpb.PreallocTimeseries{ - makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), makeExemplars([]string{"trace_id", "xxx"}, now.UnixMilli(), 1)), - makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil), - makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil), - makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil), - makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil), + makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, makeExemplars([]string{"trace_id", "xxx"}, now.UnixMilli(), 1)), + makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil, nil), + makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil, nil), + makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil, nil), + makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil, nil), }, Metadata: []*mimirpb.MetricMetadata{ {MetricFamilyName: "series_one", Type: mimirpb.COUNTER, Help: "Series one description"}, @@ -254,7 +254,7 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo createWriteRequest := func() *mimirpb.WriteRequest { return &mimirpb.WriteRequest{ Timeseries: []mimirpb.PreallocTimeseries{ - makeTimeseries([]string{model.MetricNameLabel, strings.Repeat("x", hugeLabelValueLength)}, makeSamples(now.UnixMilli(), 1), nil), + makeTimeseries([]string{model.MetricNameLabel, strings.Repeat("x", hugeLabelValueLength)}, makeSamples(now.UnixMilli(), 1), nil, nil), }, } } @@ -315,11 +315,11 @@ func TestDistributor_Push_ShouldSupportWriteBothToIngestersAndPartitions(t *test createRequest := func() *mimirpb.WriteRequest { return &mimirpb.WriteRequest{ Timeseries: []mimirpb.PreallocTimeseries{ - makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil), - makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil), - makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil), - makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil), - makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil), + makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, nil), + makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil, nil), + makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil, nil), + makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil, nil), + makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil, nil), }, } } @@ -519,7 +519,7 @@ func TestDistributor_Push_ShouldCleanupWriteRequestAfterWritingBothToIngestersAn // Send write request. _, err := distributors[0].Push(ctx, &mimirpb.WriteRequest{ Timeseries: []mimirpb.PreallocTimeseries{ - makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil), + makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, nil), }, }) require.NoError(t, err) @@ -600,7 +600,7 @@ func TestDistributor_Push_ShouldGivePrecedenceToPartitionsErrorWhenWritingBothTo // Send write request. _, err := distributors[0].Push(ctx, &mimirpb.WriteRequest{ Timeseries: []mimirpb.PreallocTimeseries{ - makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil), + makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, nil), }, }) @@ -899,12 +899,12 @@ func TestDistributor_LabelValuesCardinality_AvailabilityAndConsistencyWithIngest var ( // Define fixtures used in tests. - series1 = makeTimeseries([]string{labels.MetricName, "series_1", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil) - series2 = makeTimeseries([]string{labels.MetricName, "series_2", "job", "job-b", "service", "service-1"}, makeSamples(0, 0), nil) - series3 = makeTimeseries([]string{labels.MetricName, "series_3", "job", "job-c", "service", "service-1"}, makeSamples(0, 0), nil) - series4 = makeTimeseries([]string{labels.MetricName, "series_4", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil) - series5 = makeTimeseries([]string{labels.MetricName, "series_5", "job", "job-a", "service", "service-2"}, makeSamples(0, 0), nil) - series6 = makeTimeseries([]string{labels.MetricName, "series_6", "job", "job-b" /* no service label */}, makeSamples(0, 0), nil) + series1 = makeTimeseries([]string{labels.MetricName, "series_1", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series2 = makeTimeseries([]string{labels.MetricName, "series_2", "job", "job-b", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series3 = makeTimeseries([]string{labels.MetricName, "series_3", "job", "job-c", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series4 = makeTimeseries([]string{labels.MetricName, "series_4", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series5 = makeTimeseries([]string{labels.MetricName, "series_5", "job", "job-a", "service", "service-2"}, makeSamples(0, 0), nil, nil) + series6 = makeTimeseries([]string{labels.MetricName, "series_6", "job", "job-b" /* no service label */}, makeSamples(0, 0), nil, nil) // To keep assertions simple, all tests push all series, and then request the cardinality of the same label names, // so we expect the same response from each successful test. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index a9da46cb13e..561021499d1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1467,6 +1467,290 @@ func TestDistributor_Push_HistogramValidation(t *testing.T) { } } +func TestDistributor_SampleDuplicateTimestamp(t *testing.T) { + labels := []string{labels.MetricName, "series", "job", "job", "service", "service"} + + testCases := map[string]struct { + req *mimirpb.WriteRequest + expectedSamples []mimirpb.PreallocTimeseries + expectedErrors []error + expectedMetrics string + }{ + "do not deduplicate if there are no duplicated timestamps": { + req: makeWriteRequestWith( + makeTimeseries(labels, makeSamples(10, 1), nil, nil), + makeTimeseries(labels, makeSamples(20, 2), nil, nil), + ), + expectedSamples: []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, makeSamples(10, 1), nil, nil), + makeTimeseries(labels, makeSamples(20, 2), nil, nil), + }, + }, + "deduplicate duplicated timestamps within a single timeseries, and return the first error encountered": { + req: makeWriteRequestWith( + makeTimeseries(labels, append(makeSamples(10, 1), append(makeSamples(20, 2), append(makeSamples(10, 3), makeSamples(20, 4)...)...)...), nil, nil), + makeTimeseries(labels, nil, append(makeHistograms(30, generateTestHistogram(0)), append(makeHistograms(40, generateTestHistogram(1)), append(makeHistograms(40, generateTestHistogram(2)), makeHistograms(30, generateTestHistogram(3))...)...)...), nil), + ), + expectedSamples: []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, append(makeSamples(10, 1), makeSamples(20, 2)...), nil, nil), + makeTimeseries(labels, nil, append(makeHistograms(30, generateTestHistogram(0)), makeHistograms(40, generateTestHistogram(1))...), nil), + }, + expectedErrors: []error{ + fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 2, "series"), + fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 2, "series"), + }, + expectedMetrics: ` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{group="test-group",reason="sample_duplicate_timestamp",user="user"} 4 + `, + }, + "do not deduplicate duplicated timestamps in different timeseries": { + req: makeWriteRequestWith( + makeTimeseries(labels, append(makeSamples(10, 1), makeSamples(10, 2)...), makeHistograms(30, generateTestHistogram(0)), nil), + makeTimeseries(labels, makeSamples(10, 3), append(makeHistograms(20, generateTestHistogram(1)), makeHistograms(20, generateTestHistogram(2))...), nil), + makeTimeseries(labels, makeSamples(10, 4), append(makeHistograms(20, generateTestHistogram(3)), makeHistograms(30, generateTestHistogram(4))...), nil), + ), + expectedSamples: []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, makeSamples(10, 1), makeHistograms(30, generateTestHistogram(0)), nil), + makeTimeseries(labels, makeSamples(10, 3), makeHistograms(20, generateTestHistogram(1)), nil), + makeTimeseries(labels, makeSamples(10, 4), append(makeHistograms(20, generateTestHistogram(3)), makeHistograms(30, generateTestHistogram(4))...), nil), + }, + expectedErrors: []error{ + fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 1, "series"), + fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 1, "series"), + nil, + }, + expectedMetrics: ` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{group="test-group",reason="sample_duplicate_timestamp",user="user"} 2 + `, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + limits := prepareDefaultLimits() + ds, _, regs, _ := prepare(t, prepConfig{ + limits: limits, + numDistributors: 1, + }) + + // Pre-condition check. + require.Len(t, ds, 1) + require.Len(t, regs, 1) + + now := mtime.Now() + for i, ts := range tc.req.Timeseries { + shouldRemove, err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0) + require.False(t, shouldRemove) + if len(tc.expectedErrors) == 0 { + require.NoError(t, err) + } else { + if tc.expectedErrors[i] == nil { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Equal(t, tc.expectedErrors[i], err) + } + } + } + + assert.Equal(t, tc.expectedSamples, tc.req.Timeseries) + assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(tc.expectedMetrics), "cortex_discarded_samples_total")) + }) + } +} + +func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) { + const ( + metricName = "series" + testSize = 80_000 + numberOfDifferentValues = 40_000 + ) + labels := []string{labels.MetricName, metricName, "job", "job", "service", "service"} + + now := mtime.Now() + timestamp := now.UnixMilli() + + testCases := map[string]struct { + setup func(int) [][]mimirpb.PreallocTimeseries + expectedErrors bool + }{ + "one timeseries with one sample": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + ts := []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, makeSamples(timestamp, 1), nil, nil), + } + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + timeseries[i] = ts + } + return timeseries + }, + expectedErrors: false, + }, + "one timeseries with one histogram": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + ts := []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, nil, makeHistograms(timestamp, generateTestHistogram(1)), nil), + } + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + timeseries[i] = ts + } + return timeseries + }, + expectedErrors: false, + }, + "one timeseries with one sample and one histogram": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + ts := []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, makeSamples(timestamp-1, 1), makeHistograms(timestamp, generateTestHistogram(2)), nil), + } + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + timeseries[i] = ts + } + return timeseries + }, + expectedErrors: false, + }, + "one timeseries with two samples": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + ts := []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, append(makeSamples(timestamp-1, 1), makeSamples(timestamp, 2)...), nil, nil), + } + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + timeseries[i] = ts + } + return timeseries + }, + expectedErrors: false, + }, + "one timeseries with two histograms": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + ts := []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, nil, append(makeHistograms(timestamp-1, generateTestHistogram(1)), makeHistograms(timestamp, generateTestHistogram(2))...), nil), + } + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + timeseries[i] = ts + } + return timeseries + }, + expectedErrors: false, + }, + "one timeseries with two samples and two histograms": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + ts := []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, append(makeSamples(timestamp-1, 1), makeSamples(timestamp, 2)...), append(makeHistograms(timestamp-1, generateTestHistogram(3)), makeHistograms(timestamp, generateTestHistogram(4))...), nil), + } + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + timeseries[i] = ts + } + return timeseries + }, + expectedErrors: false, + }, + "one timeseries with 80_000 samples with duplicated timestamps": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + samples := make([]mimirpb.Sample, testSize) + value := 0 + for i := 0; i < testSize; i++ { + if i < numberOfDifferentValues { + value++ + } + samples[i].TimestampMs = timestamp + samples[i].Value = float64(value) + } + timeseries[i] = []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, samples, nil, nil), + } + } + return timeseries + }, + expectedErrors: true, + }, + "one timeseries with 80_000 histograms with duplicated timestamps": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + histograms := make([]mimirpb.Histogram, testSize) + value := 0 + for i := 0; i < testSize; i++ { + if i < numberOfDifferentValues { + value++ + } + histograms[i] = makeHistograms(timestamp, generateTestHistogram(value))[0] + } + timeseries[i] = []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, nil, histograms, nil), + } + } + return timeseries + }, + expectedErrors: true, + }, + "one timeseries with 80_000 samples and 80_000 histograms with duplicated timestamps": { + setup: func(n int) [][]mimirpb.PreallocTimeseries { + timeseries := make([][]mimirpb.PreallocTimeseries, n) + for i := 0; i < n; i++ { + samples := make([]mimirpb.Sample, testSize) + histograms := make([]mimirpb.Histogram, testSize) + value := 0 + for i := 0; i < testSize; i++ { + if i < numberOfDifferentValues { + value++ + } + samples[i].TimestampMs = timestamp + samples[i].Value = float64(value) + histograms[i] = makeHistograms(timestamp, generateTestHistogram(value))[0] + } + timeseries[i] = []mimirpb.PreallocTimeseries{ + makeTimeseries(labels, samples, histograms, nil), + } + } + return timeseries + }, + expectedErrors: true, + }, + } + + limits := prepareDefaultLimits() + ds, _, regs, _ := prepare(b, prepConfig{ + limits: limits, + numDistributors: 1, + }) + + // Pre-condition check. + require.Len(b, ds, 1) + require.Len(b, regs, 1) + + for name, tc := range testCases { + b.Run(name, func(b *testing.B) { + timeseries := tc.setup(b.N) + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, ts := range timeseries[n] { + _, err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0) + if !tc.expectedErrors && err != nil { + b.Fatal(err) + } else if tc.expectedErrors && err == nil { + b.Fatal("an error was expected") + } + } + } + }) + } +} + func TestDistributor_ExemplarValidation(t *testing.T) { tests := map[string]struct { prepareConfig func(limits *validation.Limits) @@ -1700,8 +1984,9 @@ func TestDistributor_ExemplarValidation(t *testing.T) { require.Len(t, regs, 1) for _, ts := range tc.req.Timeseries { - err := ds[0].validateSeries(now, &ts, "user", "test-group", false, false, tc.minExemplarTS, tc.maxExemplarTS) + shouldRemove, err := ds[0].validateSeries(now, &ts, "user", "test-group", false, false, tc.minExemplarTS, tc.maxExemplarTS) assert.NoError(t, err) + assert.False(t, shouldRemove) } assert.Equal(t, tc.expectedExemplars, tc.req.Timeseries) @@ -1807,11 +2092,13 @@ func TestDistributor_HistogramReduction(t *testing.T) { require.Len(t, regs, 1) for _, ts := range tc.req.Timeseries { - err := ds[0].validateSeries(now, &ts, "user", "test-group", false, false, 0, 0) + shouldRemove, err := ds[0].validateSeries(now, &ts, "user", "test-group", false, false, 0, 0) if tc.expectedError != nil { require.ErrorAs(t, err, &tc.expectedError) + require.True(t, shouldRemove) } else { assert.NoError(t, err) + assert.False(t, shouldRemove) } } if tc.expectedError == nil { @@ -4021,13 +4308,13 @@ func TestDistributor_LabelValuesCardinality(t *testing.T) { func TestDistributor_LabelValuesCardinality_AvailabilityAndConsistency(t *testing.T) { var ( // Define fixtures used in tests. - series1 = makeTimeseries([]string{labels.MetricName, "series_1", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil) - series2 = makeTimeseries([]string{labels.MetricName, "series_2", "job", "job-b", "service", "service-1"}, makeSamples(0, 0), nil) - series3 = makeTimeseries([]string{labels.MetricName, "series_3", "job", "job-c", "service", "service-1"}, makeSamples(0, 0), nil) - series4 = makeTimeseries([]string{labels.MetricName, "series_4", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil) - series5 = makeTimeseries([]string{labels.MetricName, "series_5", "job", "job-a", "service", "service-2"}, makeSamples(0, 0), nil) - series6 = makeTimeseries([]string{labels.MetricName, "series_6", "job", "job-b" /* no service label */}, makeSamples(0, 0), nil) - other1 = makeTimeseries([]string{labels.MetricName, "other_1", "job", "job-1", "service", "service-1"}, makeSamples(0, 0), nil) + series1 = makeTimeseries([]string{labels.MetricName, "series_1", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series2 = makeTimeseries([]string{labels.MetricName, "series_2", "job", "job-b", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series3 = makeTimeseries([]string{labels.MetricName, "series_3", "job", "job-c", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series4 = makeTimeseries([]string{labels.MetricName, "series_4", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil, nil) + series5 = makeTimeseries([]string{labels.MetricName, "series_5", "job", "job-a", "service", "service-2"}, makeSamples(0, 0), nil, nil) + series6 = makeTimeseries([]string{labels.MetricName, "series_6", "job", "job-b" /* no service label */}, makeSamples(0, 0), nil, nil) + other1 = makeTimeseries([]string{labels.MetricName, "other_1", "job", "job-1", "service", "service-1"}, makeSamples(0, 0), nil, nil) // To keep assertions simple, all tests push all series, and then request the cardinality of the same label names, // so we expect the same response from each successful test. @@ -4723,6 +5010,7 @@ func TestRelabelMiddleware(t *testing.T) { }, makeSamples(123, 1.23), nil, + nil, )}, }}, expectedReqs: []*mimirpb.WriteRequest{{ @@ -4734,6 +5022,7 @@ func TestRelabelMiddleware(t *testing.T) { }, makeSamples(123, 1.23), nil, + nil, )}, }}, expectErrs: []bool{false}, @@ -5401,6 +5690,7 @@ func makeWriteRequest(startTimestampMs int64, samples, metadata int, exemplars, }, makeSamples(startTimestampMs+int64(i), float64(i)), nil, + nil, ) if exemplars { @@ -5443,12 +5733,13 @@ func makeWriteRequestWith(series ...mimirpb.PreallocTimeseries) *mimirpb.WriteRe return &mimirpb.WriteRequest{Timeseries: series} } -func makeTimeseries(seriesLabels []string, samples []mimirpb.Sample, exemplars []mimirpb.Exemplar) mimirpb.PreallocTimeseries { +func makeTimeseries(seriesLabels []string, samples []mimirpb.Sample, histograms []mimirpb.Histogram, exemplars []mimirpb.Exemplar) mimirpb.PreallocTimeseries { return mimirpb.PreallocTimeseries{ TimeSeries: &mimirpb.TimeSeries{ - Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)), - Samples: samples, - Exemplars: exemplars, + Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...)), + Samples: samples, + Histograms: histograms, + Exemplars: exemplars, }, } } @@ -7689,7 +7980,7 @@ func TestDistributor_Push_SendMessageMetadata(t *testing.T) { req := &mimirpb.WriteRequest{ Timeseries: []mimirpb.PreallocTimeseries{ - makeTimeseries([]string{model.MetricNameLabel, "test1"}, makeSamples(time.Now().UnixMilli(), 1), nil), + makeTimeseries([]string{model.MetricNameLabel, "test1"}, makeSamples(time.Now().UnixMilli(), 1), nil, nil), }, Source: mimirpb.API, } diff --git a/pkg/distributor/query_test.go b/pkg/distributor/query_test.go index 49f10ee3416..9f47672938d 100644 --- a/pkg/distributor/query_test.go +++ b/pkg/distributor/query_test.go @@ -37,10 +37,10 @@ func TestDistributor_QueryExemplars(t *testing.T) { fixtures := []mimirpb.PreallocTimeseries{ // Note: it's important to write at least a sample, otherwise the exemplar timestamp validation doesn't pass. - makeTimeseries([]string{labels.MetricName, "series_1", "namespace", "a"}, makeSamples(int64(now), 1), makeExemplars([]string{"trace_id", "A"}, int64(now), 0)), - makeTimeseries([]string{labels.MetricName, "series_1", "namespace", "b"}, makeSamples(int64(now), 2), makeExemplars([]string{"trace_id", "B"}, int64(now), 0)), - makeTimeseries([]string{labels.MetricName, "series_2", "namespace", "a"}, makeSamples(int64(now), 3), makeExemplars([]string{"trace_id", "C"}, int64(now), 0)), - makeTimeseries([]string{labels.MetricName, "series_2", "namespace", "b"}, makeSamples(int64(now), 4), makeExemplars([]string{"trace_id", "D"}, int64(now), 0)), + makeTimeseries([]string{labels.MetricName, "series_1", "namespace", "a"}, makeSamples(int64(now), 1), nil, makeExemplars([]string{"trace_id", "A"}, int64(now), 0)), + makeTimeseries([]string{labels.MetricName, "series_1", "namespace", "b"}, makeSamples(int64(now), 2), nil, makeExemplars([]string{"trace_id", "B"}, int64(now), 0)), + makeTimeseries([]string{labels.MetricName, "series_2", "namespace", "a"}, makeSamples(int64(now), 3), nil, makeExemplars([]string{"trace_id", "C"}, int64(now), 0)), + makeTimeseries([]string{labels.MetricName, "series_2", "namespace", "b"}, makeSamples(int64(now), 4), nil, makeExemplars([]string{"trace_id", "D"}, int64(now), 0)), } tests := map[string]struct { @@ -229,7 +229,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac writeReq = &mimirpb.WriteRequest{} for i := 0; i < limit; i++ { writeReq.Timeseries = append(writeReq.Timeseries, - makeTimeseries([]string{model.MetricNameLabel, fmt.Sprintf("another_series_%d", i)}, makeSamples(0, 0), nil), + makeTimeseries([]string{model.MetricNameLabel, fmt.Sprintf("another_series_%d", i)}, makeSamples(0, 0), nil, nil), ) } @@ -308,7 +308,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac } // Push more series to exceed the limit once we'll query back all series. - writeReq = makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series"}, makeSamples(0, 0), nil)) + writeReq = makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series"}, makeSamples(0, 0), nil, nil)) writeRes, err = ds[0].Push(userCtx, writeReq) assert.Equal(t, &mimirpb.WriteResponse{}, writeRes) @@ -354,7 +354,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), } // Push a single series to allow us to calculate the chunk size to calculate the limit for the test. - writeReq := makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series"}, makeSamples(0, 0), nil)) + writeReq := makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series"}, makeSamples(0, 0), nil, nil)) writeRes, err := ds[0].Push(ctx, writeReq) assert.Equal(t, &mimirpb.WriteResponse{}, writeRes) assert.Nil(t, err) @@ -383,7 +383,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs assert.Len(t, queryRes.Chunkseries, seriesToAdd) // Push another series to exceed the chunk bytes limit once we'll query back all series. - writeReq = makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series_1"}, makeSamples(0, 0), nil)) + writeReq = makeWriteRequestWith(makeTimeseries([]string{model.MetricNameLabel, "another_series_1"}, makeSamples(0, 0), nil, nil)) writeRes, err = ds[0].Push(ctx, writeReq) assert.Equal(t, &mimirpb.WriteResponse{}, writeRes) diff --git a/pkg/distributor/validate.go b/pkg/distributor/validate.go index fffc943b6c0..ab9426513ad 100644 --- a/pkg/distributor/validate.go +++ b/pkg/distributor/validate.go @@ -43,6 +43,7 @@ var ( reasonDuplicateLabelNames = globalerror.SeriesWithDuplicateLabelNames.LabelValue() reasonTooFarInFuture = globalerror.SampleTooFarInFuture.LabelValue() reasonTooFarInPast = globalerror.SampleTooFarInPast.LabelValue() + reasonDuplicateTimestamp = globalerror.SampleDuplicateTimestamp.LabelValue() // Discarded exemplars reasons. reasonExemplarLabelsMissing = globalerror.ExemplarLabelsMissing.LabelValue() @@ -98,6 +99,7 @@ var ( "received a sample whose timestamp is too far in the past, timestamp: %d series: '%.200s'", validation.PastGracePeriodFlag, ) + duplicateTimestampMsgFormat = globalerror.SampleDuplicateTimestamp.Message("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s'") exemplarEmptyLabelsMsgFormat = globalerror.ExemplarLabelsMissing.Message( "received an exemplar with no valid labels, timestamp: %d series: %s labels: %s", ) @@ -143,6 +145,7 @@ type sampleValidationMetrics struct { duplicateLabelNames *prometheus.CounterVec tooFarInFuture *prometheus.CounterVec tooFarInPast *prometheus.CounterVec + duplicateTimestamp *prometheus.CounterVec } func (m *sampleValidationMetrics) deleteUserMetrics(userID string) { @@ -160,6 +163,7 @@ func (m *sampleValidationMetrics) deleteUserMetrics(userID string) { m.duplicateLabelNames.DeletePartialMatch(filter) m.tooFarInFuture.DeletePartialMatch(filter) m.tooFarInPast.DeletePartialMatch(filter) + m.duplicateTimestamp.DeletePartialMatch(filter) } func (m *sampleValidationMetrics) deleteUserMetricsForGroup(userID, group string) { @@ -176,6 +180,7 @@ func (m *sampleValidationMetrics) deleteUserMetricsForGroup(userID, group string m.duplicateLabelNames.DeleteLabelValues(userID, group) m.tooFarInFuture.DeleteLabelValues(userID, group) m.tooFarInPast.DeleteLabelValues(userID, group) + m.duplicateTimestamp.DeleteLabelValues(userID, group) } func newSampleValidationMetrics(r prometheus.Registerer) *sampleValidationMetrics { @@ -193,6 +198,7 @@ func newSampleValidationMetrics(r prometheus.Registerer) *sampleValidationMetric duplicateLabelNames: validation.DiscardedSamplesCounter(r, reasonDuplicateLabelNames), tooFarInFuture: validation.DiscardedSamplesCounter(r, reasonTooFarInFuture), tooFarInPast: validation.DiscardedSamplesCounter(r, reasonTooFarInPast), + duplicateTimestamp: validation.DiscardedSamplesCounter(r, reasonDuplicateTimestamp), } } diff --git a/pkg/mimirpb/timeseries.go b/pkg/mimirpb/timeseries.go index 43eb0dd2a72..9ddefa711c4 100644 --- a/pkg/mimirpb/timeseries.go +++ b/pkg/mimirpb/timeseries.go @@ -182,6 +182,10 @@ func (p *PreallocTimeseries) ResizeExemplars(newSize int) { p.clearUnmarshalData() } +func (p *PreallocTimeseries) SamplesUpdated() { + p.clearUnmarshalData() +} + func (p *PreallocTimeseries) HistogramsUpdated() { p.clearUnmarshalData() }