From b249cec19b2b64882eeaefdf0057e6138c35ffef Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 26 Aug 2024 11:12:09 +0200 Subject: [PATCH] [weekly-r304] Cherry-pick TSDB Head series hash fix (#9098) Signed-off-by: Marco Pracucci --- go.mod | 2 +- go.sum | 4 +- .../prometheus/prometheus/tsdb/head.go | 50 +++++-------------- .../prometheus/prometheus/tsdb/head_append.go | 2 +- .../prometheus/prometheus/tsdb/head_read.go | 2 +- .../prometheus/prometheus/tsdb/head_wal.go | 13 ++--- vendor/modules.txt | 4 +- 7 files changed, 25 insertions(+), 52 deletions(-) diff --git a/go.mod b/go.mod index e5dabec0c89..dcca28f27f7 100644 --- a/go.mod +++ b/go.mod @@ -276,7 +276,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240826074809-8185e4cfd8b1 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 58c9c81d1dc..3e75d7e4190 100644 --- a/go.sum +++ b/go.sum @@ -524,8 +524,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 h1:Vkq7Jib3/vBZO3oyxzSUcbUdpyK2rm3CzFIJyt4vtfQ= -github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03/go.mod h1:cNDAD0ooSyLfNtakmnGbChNg7JPYmKsRn7CQ01Rpu2E= +github.com/grafana/mimir-prometheus v0.0.0-20240826074809-8185e4cfd8b1 h1:lHqCrM5+jFWf2DyDUCki01q98UnBdFVZ9E3djPe3oYQ= +github.com/grafana/mimir-prometheus v0.0.0-20240826074809-8185e4cfd8b1/go.mod h1:cNDAD0ooSyLfNtakmnGbChNg7JPYmKsRn7CQ01Rpu2E= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 h1:AJKOtDKAOg8XNFnIZSmqqqutoTSxVlRs6vekL2p2KEY= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head.go b/vendor/github.com/prometheus/prometheus/tsdb/head.go index bdfe37ed63a..239f01cde12 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head.go @@ -195,7 +195,6 @@ type HeadOptions struct { WALReplayConcurrency int // EnableSharding enables ShardedPostings() support in the Head. - // EnableSharding is temporarily disabled during Init(). EnableSharding bool // Timely compaction allows head compaction to happen when min block range can no longer be appended, @@ -655,7 +654,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // Init loads data from the write ahead log and prepares the head for writes. // It should be called before using an appender so that it // limits the ingested samples to the head min valid time. -func (h *Head) Init(minValidTime int64) (err error) { +func (h *Head) Init(minValidTime int64) error { h.minValidTime.Store(minValidTime) defer func() { h.postings.EnsureOrder(h.opts.WALReplayConcurrency) @@ -669,24 +668,6 @@ func (h *Head) Init(minValidTime int64) (err error) { } }() - // If sharding is enabled, disable it while initializing, and calculate the shards later. - // We're going to use that field for other purposes during WAL replay, - // so we don't want to waste time on calculating the shard that we're going to lose anyway. - if h.opts.EnableSharding { - h.opts.EnableSharding = false - defer func() { - h.opts.EnableSharding = true - if err == nil { - // No locking is needed here as nobody should be writing while we're in Init. - for _, stripe := range h.series.series { - for _, s := range stripe { - s.shardHashOrMemoryMappedMaxTime = labels.StableHash(s.lset) - } - } - } - }() - } - level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any") start := time.Now() @@ -747,6 +728,7 @@ func (h *Head) Init(minValidTime int64) (err error) { mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk lastMmapRef chunks.ChunkDiskMapperRef + err error mmapChunkReplayDuration time.Duration ) @@ -2137,11 +2119,9 @@ type memSeries struct { ref chunks.HeadSeriesRef meta *metadata.Metadata - // Series labels hash to use for sharding purposes. - // The value is always 0 when sharding has not been explicitly enabled in TSDB. - // While the WAL replay the value stored here is the max time of any mmapped chunk, - // and the shard hash is re-calculated after WAL replay is complete. - shardHashOrMemoryMappedMaxTime uint64 + // Series labels hash to use for sharding purposes. The value is always 0 when sharding has not + // been explicitly enabled in TSDB. + shardHash uint64 // Value returned by secondary hash function. secondaryHash uint32 @@ -2169,6 +2149,8 @@ type memSeries struct { ooo *memSeriesOOOFields + mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. + // chunkEndTimeVariance is how much variance (between 0 and 1) should be applied to the chunk end time, // to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 to disable variance. chunkEndTimeVariance float64 @@ -2203,12 +2185,12 @@ type memSeriesOOOFields struct { func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, secondaryHash uint32, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - nextAt: math.MinInt64, - chunkEndTimeVariance: chunkEndTimeVariance, - shardHashOrMemoryMappedMaxTime: shardHash, - secondaryHash: secondaryHash, + lset: lset, + ref: id, + nextAt: math.MinInt64, + chunkEndTimeVariance: chunkEndTimeVariance, + shardHash: shardHash, + secondaryHash: secondaryHash, } if !isolationDisabled { s.txs = newTxRing(0) @@ -2296,12 +2278,6 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD return removedInOrder + removedOOO } -// shardHash returns the shard hash of the series, only available after WAL replay. -func (s *memSeries) shardHash() uint64 { return s.shardHashOrMemoryMappedMaxTime } - -// mmMaxTime returns the max time of any mmapped chunk in the series, only available during WAL replay. -func (s *memSeries) mmMaxTime() int64 { return int64(s.shardHashOrMemoryMappedMaxTime) } - // cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after // acquiring lock. func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go index 9bc5c4a5994..8af7a5e5c4d 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go @@ -1307,7 +1307,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts maxNextAt := s.nextAt s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt, 4) - s.nextAt = addJitterToChunkEndTime(s.shardHash(), c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance) + s.nextAt = addJitterToChunkEndTime(s.shardHash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance) } // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, // most likely because samples rate has changed and now they are arriving more frequently. diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head_read.go b/vendor/github.com/prometheus/prometheus/tsdb/head_read.go index 27a662d00d3..46d24b0fb62 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head_read.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head_read.go @@ -174,7 +174,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou } // Check if the series belong to the shard. - if s.shardHash()%shardCount != shardIndex { + if s.shardHash%shardCount != shardIndex { continue } diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go b/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go index 59a40970603..817811c795a 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go @@ -435,8 +435,6 @@ Outer: return nil } -func minInt64() int64 { return math.MinInt64 } - // resetSeriesWithMMappedChunks is only used during the WAL replay. func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) { if mSeries.ref != walSeriesRef { @@ -483,11 +481,10 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m } // Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. if len(mmc) == 0 { - mSeries.shardHashOrMemoryMappedMaxTime = uint64(minInt64()) + mSeries.mmMaxTime = math.MinInt64 } else { - mmMaxTime := mmc[len(mmc)-1].maxTime - mSeries.shardHashOrMemoryMappedMaxTime = uint64(mmMaxTime) - h.updateMinMaxTime(mmc[0].minTime, mmMaxTime) + mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime + h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) } if len(oooMmc) != 0 { // Mint and maxt can be in any chunk, they are not sorted. @@ -590,7 +587,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp unknownRefs++ continue } - if s.T <= ms.mmMaxTime() { + if s.T <= ms.mmMaxTime { continue } if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { @@ -619,7 +616,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp unknownHistogramRefs++ continue } - if s.t <= ms.mmMaxTime() { + if s.t <= ms.mmMaxTime { continue } var chunkCreated bool diff --git a/vendor/modules.txt b/vendor/modules.txt index 9cd8cbb0621..1a20a401f42 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -996,7 +996,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240826074809-8185e4cfd8b1 ## explicit; go 1.21.0 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1647,7 +1647,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240812035817-c17c11f77f03 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240826074809-8185e4cfd8b1 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b