From cecb0dc986a0cfbe62fd466b5493320c62fb6f9e Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 29 Oct 2024 11:21:17 +0100 Subject: [PATCH] Update mimir-prometheus to 5710f65f1444 Signed-off-by: Arve Knudsen --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- .../prometheus/prometheus/tsdb/exemplar.go | 22 +++---- .../prometheus/prometheus/tsdb/head_append.go | 64 +++++++++---------- .../prometheus/tsdb/index/postings.go | 27 +++++--- vendor/modules.txt | 4 +- 7 files changed, 66 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e2b130ea84..7f4ac8ef229 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ * [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666 * [BUGFIX] Fix issue when using the experimental `-ruler.max-independent-rule-evaluation-concurrency` feature, where the ruler could panic as it updates a running ruleset or shutdowns. #9726 * [BUGFIX] Ingester: Fix race condition in per-tenant TSDB creation. #9708 +* [BUGFIX] Ingester: Fix race condition in exemplar adding. #9765 ### Mixin diff --git a/go.mod b/go.mod index 9420ce5f0a0..527af3f3523 100644 --- a/go.mod +++ b/go.mod @@ -282,7 +282,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241023201215-e8c852a65765 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241029123536-5710f65f1444 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index c4f3ec71a1f..8a321430d7e 100644 --- a/go.sum +++ b/go.sum @@ -527,8 +527,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20241023201215-e8c852a65765 h1:nOq2fEZviAcek/D7gRnNAOXSVMCq4Fyj7bScCXvGYEw= -github.com/grafana/mimir-prometheus v0.0.0-20241023201215-e8c852a65765/go.mod h1:jw8nT+R1mhH4FdHRkYlht7cpilJCP0j0M83c5BWqYUg= +github.com/grafana/mimir-prometheus v0.0.0-20241029123536-5710f65f1444 h1:o3Plh90SIbYgW6penf8BV3QUH0gwBJou+Cd2uCfamRk= +github.com/grafana/mimir-prometheus v0.0.0-20241029123536-5710f65f1444/go.mod h1:7SuFBLahBoRY7KcgzWzK0p1n5QL+5dyr/Ysat6v1378= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/exemplar.go b/vendor/github.com/prometheus/prometheus/tsdb/exemplar.go index 7545ab9a602..8f39377de0c 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/exemplar.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/exemplar.go @@ -152,13 +152,13 @@ func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { ret := make([]exemplar.QueryResult, 0) + ce.lock.RLock() + defer ce.lock.RUnlock() + if len(ce.exemplars) == 0 { return ret, nil } - ce.lock.RLock() - defer ce.lock.RUnlock() - // Loop through each index entry, which will point us to first/last exemplar for each series. for _, idx := range ce.index { var se exemplar.QueryResult @@ -281,13 +281,13 @@ func (ce *CircularExemplarStorage) Resize(l int64) int { l = 0 } + ce.lock.Lock() + defer ce.lock.Unlock() + if l == int64(len(ce.exemplars)) { return 0 } - ce.lock.Lock() - defer ce.lock.Unlock() - oldBuffer := ce.exemplars oldNextIndex := int64(ce.nextIndex) @@ -349,6 +349,11 @@ func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byt } func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { + // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. + // Optimize by moving the lock to be per series (& benchmark it). + ce.lock.Lock() + defer ce.lock.Unlock() + if len(ce.exemplars) == 0 { return storage.ErrExemplarsDisabled } @@ -356,11 +361,6 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp var buf [1024]byte seriesLabels := l.Bytes(buf[:]) - // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. - // Optimize by moving the lock to be per series (& benchmark it). - ce.lock.Lock() - defer ce.lock.Unlock() - idx, ok := ce.index[string(seriesLabels)] err := ce.validateExemplar(idx, e, true) if err != nil { diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go index b043225c942..6a308abb14b 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go @@ -331,13 +331,17 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } } if value.IsStaleNaN(v) { + // This is not thread safe as we should be holding the lock for "s". + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we could do this conversion + // in commit. This code should move into Commit(). switch { case s.lastHistogramValue != nil: return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) @@ -394,7 +398,7 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, err = a.getOrCreate(lset) + s, _, err = a.getOrCreate(lset) if err != nil { return 0, err } @@ -424,20 +428,18 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab return storage.SeriesRef(s.ref), nil } -func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) { +func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) { // Ensure no empty labels have gotten through. lset = lset.WithoutEmpty() if lset.IsEmpty() { - return nil, fmt.Errorf("empty labelset: %w", ErrInvalidSample) + return nil, false, fmt.Errorf("empty labelset: %w", ErrInvalidSample) } if l, dup := lset.HasDuplicateLabelNames(); dup { - return nil, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) + return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) } - var created bool - var err error - s, created, err := a.head.getOrCreate(lset.Hash(), lset) + s, created, err = a.head.getOrCreate(lset.Hash(), lset) if err != nil { - return nil, err + return nil, false, err } if created { a.series = append(a.series, record.RefSeries{ @@ -445,7 +447,7 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) { Labels: lset, }) } - return s, nil + return s, created, nil } // appendable checks whether the given sample is valid for appending to the series. (if we return false and no error) @@ -644,41 +646,27 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } } + var created bool s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { - // Ensure no empty labels have gotten through. - lset = lset.WithoutEmpty() - if lset.IsEmpty() { - return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample) - } - - if l, dup := lset.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) - } - - var created bool var err error - s, created, err = a.head.getOrCreate(lset.Hash(), lset) + s, created, err = a.getOrCreate(lset) if err != nil { return 0, err } - if created { - switch { - case h != nil: - s.lastHistogramValue = &histogram.Histogram{} - case fh != nil: - s.lastFloatHistogramValue = &histogram.FloatHistogram{} - } - a.series = append(a.series, record.RefSeries{ - Ref: s.ref, - Labels: lset, - }) - } } switch { case h != nil: s.Lock() + + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we would not need to do this. + // This whole "if" should be removed. + if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { + s.lastHistogramValue = &histogram.Histogram{} + } + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) @@ -708,6 +696,14 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels a.histogramSeries = append(a.histogramSeries, s) case fh != nil: s.Lock() + + // TODO(krajorama): reorganize Commit() to handle samples in append order + // not floats first and then histograms. Then we would not need to do this. + // This whole "if" should be removed. + if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { + s.lastFloatHistogramValue = &histogram.FloatHistogram{} + } + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow, a.head.opts.EnableOOONativeHistograms.Load()) diff --git a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go index a85789a1e33..1b1d4e32883 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/index/postings.go @@ -345,13 +345,14 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { p.mtx.Unlock() } -func appendWithExponentialGrowth[T any](a []T, v T) []T { +func appendWithExponentialGrowth[T any](a []T, v T) (_ []T, copied bool) { if cap(a) < len(a)+1 { newList := make([]T, len(a), len(a)*2+1) copy(newList, a) a = newList + copied = true } - return append(a, v) + return append(a, v), copied } func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { @@ -360,16 +361,26 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { nm = map[string][]storage.SeriesRef{} p.m[l.Name] = nm } - list := appendWithExponentialGrowth(nm[l.Value], id) + list, copied := appendWithExponentialGrowth(nm[l.Value], id) nm[l.Value] = list - if !p.ordered { + // Return if it shouldn't be ordered, if it only has one element or if it's already ordered. + // The invariant is that the first n-1 items in the list are already sorted. + if !p.ordered || len(list) == 1 || list[len(list)-1] >= list[len(list)-2] { return } - // There is no guarantee that no higher ID was inserted before as they may - // be generated independently before adding them to postings. - // We repair order violations on insert. The invariant is that the first n-1 - // items in the list are already sorted. + + if !copied { + // We have appended to the existing slice, + // and readers may already have a copy of this postings slice, + // so we need to copy it before sorting. + old := list + list = make([]storage.SeriesRef, len(old), cap(old)) + copy(list, old) + nm[l.Value] = list + } + + // Repair order violations. for i := len(list) - 1; i >= 1; i-- { if list[i] >= list[i-1] { break diff --git a/vendor/modules.txt b/vendor/modules.txt index c6311353526..301443fca54 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1008,7 +1008,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20241023201215-e8c852a65765 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20241029123536-5710f65f1444 ## explicit; go 1.22.0 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1676,7 +1676,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241023201215-e8c852a65765 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241029123536-5710f65f1444 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b