Skip to content

Commit

Permalink
Distributor: remove samples with duplicated timestamps within a singl…
Browse files Browse the repository at this point in the history
…e timeseries (#10145)

* Distributor: remove samples with duplicated timestamps within a single timeseries

Signed-off-by: Yuri Nikolic <[email protected]>

* Making lint happy

Signed-off-by: Yuri Nikolic <[email protected]>

* Include removal of samples with duplicated timestamp in validateSeries()

Signed-off-by: Yuri Nikolic <[email protected]>

* Making lint happy

Signed-off-by: Yuri Nikolic <[email protected]>

* Adding benchmarks

Signed-off-by: Yuri Nikolic <[email protected]>

* Making lint happy

Signed-off-by: Yuri Nikolic <[email protected]>

* Improve sample removal

Signed-off-by: Yuri Nikolic <[email protected]>

* Improve benchmarks

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing review findings

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing tests

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing benchmarks

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing benchmarks

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing review findings

Signed-off-by: Yuri Nikolic <[email protected]>

* Remove big benchmarks

* Use RemoveSliceIndexes

Signed-off-by: Yuri Nikolic <[email protected]>

* Initialize removeIndexes within validateXxx methods

Signed-off-by: Yuri Nikolic <[email protected]>

* Copy element by element

Signed-off-by: Yuri Nikolic <[email protected]>

* Replace h with ts.Histogram[idx]

Signed-off-by: Yuri Nikolic <[email protected]>

* Fix validateHistogram() when size is 1

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing review findings

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Dec 12, 2024
1 parent 2640b8f commit 6dcd7a3
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## main / unreleased

* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145

### Grafana Mimir

Expand Down
124 changes: 111 additions & 13 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/extract"
"github.com/grafana/mimir/pkg/util/globalerror"
mimir_limiter "github.com/grafana/mimir/pkg/util/limiter"
util_math "github.com/grafana/mimir/pkg/util/math"
Expand Down Expand Up @@ -737,40 +738,102 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
return true, nil
}

// Validates a single series from a write request.
// validateSamples validates samples of a single timeseries and removes the ones with duplicated timestamps.
// Returns an error explaining the first validation finding.
// May alter timeseries data in-place.
// The returned error may retain the series labels.
// It uses the passed nowt time to observe the delay of sample timestamps.
func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelValidation, skipLabelCountValidation bool, minExemplarTS, maxExemplarTS int64) error {
if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation); err != nil {
return err
func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
if len(ts.Samples) == 0 {
return nil
}

now := model.TimeFromUnixNano(nowt.UnixNano())
if len(ts.Samples) == 1 {
return validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, ts.Samples[0])
}

timestamps := make(map[int64]struct{}, min(len(ts.Samples), 100))
currPos := 0
duplicatesFound := false
for _, s := range ts.Samples {
if _, ok := timestamps[s.TimestampMs]; ok {
// A sample with the same timestamp has already been validated, so we skip it.
duplicatesFound = true
continue
}

timestamps[s.TimestampMs] = struct{}{}
if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s); err != nil {
return err
}

ts.Samples[currPos] = s
currPos++
}

if duplicatesFound {
ts.Samples = ts.Samples[:currPos]
ts.SamplesUpdated()
}

return nil
}

// validateHistograms validates histograms of a single timeseries and removes the ones with duplicated timestamps.
// Returns an error explaining the first validation finding.
// May alter timeseries data in-place.
// The returned error may retain the series labels.
func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
if len(ts.Histograms) == 0 {
return nil
}

if len(ts.Histograms) == 1 {
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[0])
if err != nil {
return err
}
if updated {
ts.HistogramsUpdated()
}
return nil
}

timestamps := make(map[int64]struct{}, min(len(ts.Histograms), 100))
currPos := 0
histogramsUpdated := false
for i := range ts.Histograms {
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[i])
for idx := range ts.Histograms {
if _, ok := timestamps[ts.Histograms[idx].Timestamp]; ok {
// A sample with the same timestamp has already been validated, so we skip it.
histogramsUpdated = true
continue
}

timestamps[ts.Histograms[idx].Timestamp] = struct{}{}
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[idx])
if err != nil {
return err
}
histogramsUpdated = histogramsUpdated || updated

ts.Histograms[currPos] = ts.Histograms[idx]
currPos++
}

if histogramsUpdated {
ts.Histograms = ts.Histograms[:currPos]
ts.HistogramsUpdated()
}

return nil
}

// validateExemplars validates exemplars of a single timeseries.
// May alter timeseries data in-place.
func (d *Distributor) validateExemplars(ts *mimirpb.PreallocTimeseries, userID string, minExemplarTS, maxExemplarTS int64) {
if d.limits.MaxGlobalExemplarsPerUser(userID) == 0 {
ts.ClearExemplars()
return nil
return
}

allowedExemplars := d.limits.MaxExemplarsPerSeriesPerRequest(userID)
if allowedExemplars > 0 && len(ts.Exemplars) > allowedExemplars {
d.exemplarValidationMetrics.tooManyExemplars.WithLabelValues(userID).Add(float64(len(ts.Exemplars) - allowedExemplars))
Expand Down Expand Up @@ -802,7 +865,40 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
if !isInOrder {
ts.SortExemplars()
}
return nil
}

// Validates a single series from a write request.
// May alter timeseries data in-place.
// Returns a boolean stating if the timeseries should be removed from the request, and an error explaining the first validation finding.
// The returned error may retain the series labels.
// It uses the passed nowt time to observe the delay of sample timestamps.
func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelValidation, skipLabelCountValidation bool, minExemplarTS, maxExemplarTS int64) (bool, error) {
if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation); err != nil {
return true, err
}

now := model.TimeFromUnixNano(nowt.UnixNano())
totalSamplesAndHistograms := len(ts.Samples) + len(ts.Histograms)

if err := d.validateSamples(now, ts, userID, group); err != nil {
return true, err
}

if err := d.validateHistograms(now, ts, userID, group); err != nil {
return true, err
}

d.validateExemplars(ts, userID, minExemplarTS, maxExemplarTS)

deduplicatedSamplesAndHistograms := totalSamplesAndHistograms - len(ts.Samples) - len(ts.Histograms)

if deduplicatedSamplesAndHistograms > 0 {
d.sampleValidationMetrics.duplicateTimestamp.WithLabelValues(userID, group).Add(float64(deduplicatedSamplesAndHistograms))
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ts.Labels)
return false, fmt.Errorf(duplicateTimestampMsgFormat, deduplicatedSamplesAndHistograms, unsafeMetricName)
}

return false, nil
}
func (d *Distributor) labelValuesWithNewlines(labels []mimirpb.LabelAdapter) int {
count := 0
Expand Down Expand Up @@ -1084,7 +1180,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
skipLabelCountValidation := d.cfg.SkipLabelCountValidation || req.GetSkipLabelCountValidation()

// Note that validateSeries may drop some data in ts.
validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelValidation, skipLabelCountValidation, minExemplarTS, maxExemplarTS)
shouldRemove, validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelValidation, skipLabelCountValidation, minExemplarTS, maxExemplarTS)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
Expand All @@ -1093,7 +1189,9 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
// The series are never retained by validationErr. This is guaranteed by the way the latter is built.
firstPartialErr = newValidationError(validationErr)
}
removeIndexes = append(removeIndexes, tsIdx)
if shouldRemove {
removeIndexes = append(removeIndexes, tsIdx)
}
continue
}

Expand Down
38 changes: 19 additions & 19 deletions pkg/distributor/distributor_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func TestDistributor_Push_ShouldSupportIngestStorage(t *testing.T) {
createRequest := func() *mimirpb.WriteRequest {
return &mimirpb.WriteRequest{
Timeseries: []mimirpb.PreallocTimeseries{
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), makeExemplars([]string{"trace_id", "xxx"}, now.UnixMilli(), 1)),
makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, makeExemplars([]string{"trace_id", "xxx"}, now.UnixMilli(), 1)),
makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil, nil),
makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil, nil),
makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil, nil),
makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil, nil),
},
Metadata: []*mimirpb.MetricMetadata{
{MetricFamilyName: "series_one", Type: mimirpb.COUNTER, Help: "Series one description"},
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo
createWriteRequest := func() *mimirpb.WriteRequest {
return &mimirpb.WriteRequest{
Timeseries: []mimirpb.PreallocTimeseries{
makeTimeseries([]string{model.MetricNameLabel, strings.Repeat("x", hugeLabelValueLength)}, makeSamples(now.UnixMilli(), 1), nil),
makeTimeseries([]string{model.MetricNameLabel, strings.Repeat("x", hugeLabelValueLength)}, makeSamples(now.UnixMilli(), 1), nil, nil),
},
}
}
Expand Down Expand Up @@ -315,11 +315,11 @@ func TestDistributor_Push_ShouldSupportWriteBothToIngestersAndPartitions(t *test
createRequest := func() *mimirpb.WriteRequest {
return &mimirpb.WriteRequest{
Timeseries: []mimirpb.PreallocTimeseries{
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, nil),
makeTimeseries([]string{model.MetricNameLabel, "series_two"}, makeSamples(now.UnixMilli(), 2), nil, nil),
makeTimeseries([]string{model.MetricNameLabel, "series_three"}, makeSamples(now.UnixMilli(), 3), nil, nil),
makeTimeseries([]string{model.MetricNameLabel, "series_four"}, makeSamples(now.UnixMilli(), 4), nil, nil),
makeTimeseries([]string{model.MetricNameLabel, "series_five"}, makeSamples(now.UnixMilli(), 5), nil, nil),
},
}
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func TestDistributor_Push_ShouldCleanupWriteRequestAfterWritingBothToIngestersAn
// Send write request.
_, err := distributors[0].Push(ctx, &mimirpb.WriteRequest{
Timeseries: []mimirpb.PreallocTimeseries{
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, nil),
},
})
require.NoError(t, err)
Expand Down Expand Up @@ -600,7 +600,7 @@ func TestDistributor_Push_ShouldGivePrecedenceToPartitionsErrorWhenWritingBothTo
// Send write request.
_, err := distributors[0].Push(ctx, &mimirpb.WriteRequest{
Timeseries: []mimirpb.PreallocTimeseries{
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil),
makeTimeseries([]string{model.MetricNameLabel, "series_one"}, makeSamples(now.UnixMilli(), 1), nil, nil),
},
})

Expand Down Expand Up @@ -899,12 +899,12 @@ func TestDistributor_LabelValuesCardinality_AvailabilityAndConsistencyWithIngest

var (
// Define fixtures used in tests.
series1 = makeTimeseries([]string{labels.MetricName, "series_1", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil)
series2 = makeTimeseries([]string{labels.MetricName, "series_2", "job", "job-b", "service", "service-1"}, makeSamples(0, 0), nil)
series3 = makeTimeseries([]string{labels.MetricName, "series_3", "job", "job-c", "service", "service-1"}, makeSamples(0, 0), nil)
series4 = makeTimeseries([]string{labels.MetricName, "series_4", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil)
series5 = makeTimeseries([]string{labels.MetricName, "series_5", "job", "job-a", "service", "service-2"}, makeSamples(0, 0), nil)
series6 = makeTimeseries([]string{labels.MetricName, "series_6", "job", "job-b" /* no service label */}, makeSamples(0, 0), nil)
series1 = makeTimeseries([]string{labels.MetricName, "series_1", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil, nil)
series2 = makeTimeseries([]string{labels.MetricName, "series_2", "job", "job-b", "service", "service-1"}, makeSamples(0, 0), nil, nil)
series3 = makeTimeseries([]string{labels.MetricName, "series_3", "job", "job-c", "service", "service-1"}, makeSamples(0, 0), nil, nil)
series4 = makeTimeseries([]string{labels.MetricName, "series_4", "job", "job-a", "service", "service-1"}, makeSamples(0, 0), nil, nil)
series5 = makeTimeseries([]string{labels.MetricName, "series_5", "job", "job-a", "service", "service-2"}, makeSamples(0, 0), nil, nil)
series6 = makeTimeseries([]string{labels.MetricName, "series_6", "job", "job-b" /* no service label */}, makeSamples(0, 0), nil, nil)

// To keep assertions simple, all tests push all series, and then request the cardinality of the same label names,
// so we expect the same response from each successful test.
Expand Down
Loading

0 comments on commit 6dcd7a3

Please sign in to comment.