From a55844d52a8193d2a411fcd7a027445c5821c0cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 25 Nov 2024 12:14:08 +0200 Subject: [PATCH] receive/expandedpostingscache: fix race (#7937) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Porting https://github.com/cortexproject/cortex/pull/6369 to our code base. Add test that fails without the fix. Signed-off-by: Giedrius Statkevičius --- pkg/receive/expandedpostingscache/cache.go | 22 ++++++---- .../expandedpostingscache/cache_test.go | 42 +++++++++++++++++++ pkg/receive/multitsdb.go | 2 +- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/pkg/receive/expandedpostingscache/cache.go b/pkg/receive/expandedpostingscache/cache.go index d5c7069735..2e38d1971c 100644 --- a/pkg/receive/expandedpostingscache/cache.go +++ b/pkg/receive/expandedpostingscache/cache.go @@ -43,7 +43,7 @@ type BlocksPostingsForMatchersCache struct { postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) timeNow func() time.Time - metrics *ExpandedPostingsCacheMetrics + metrics ExpandedPostingsCacheMetrics } var ( @@ -66,8 +66,8 @@ type ExpandedPostingsCacheMetrics struct { NonCacheableQueries *prometheus.CounterVec } -func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetrics { - return &ExpandedPostingsCacheMetrics{ +func NewPostingCacheMetrics(r prometheus.Registerer) ExpandedPostingsCacheMetrics { + return ExpandedPostingsCacheMetrics{ CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "expanded_postings_cache_requests_total", Help: "Total number of requests to the cache.", @@ -87,11 +87,15 @@ func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetri } } -func NewBlocksPostingsForMatchersCache(metrics *ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64) ExpandedPostingsCache { +func NewBlocksPostingsForMatchersCache(metrics ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64, seedSize int64) *BlocksPostingsForMatchersCache { + if seedSize <= 0 { + seedSize = seedArraySize + } + return &BlocksPostingsForMatchersCache{ headCache: newFifoCache[[]storage.SeriesRef]("head", metrics, time.Now, headExpandedPostingsCacheSize), blocksCache: newFifoCache[[]storage.SeriesRef]("block", metrics, time.Now, blockExpandedPostingsCacheSize), - headSeedByMetricName: make([]int, seedArraySize), + headSeedByMetricName: make([]int, seedSize), strippedLock: make([]sync.RWMutex, numOfSeedsStripes), postingsForMatchersFunc: tsdb.PostingsForMatchers, timeNow: time.Now, @@ -129,7 +133,7 @@ func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { h := MemHashString(metricName) i := h % uint64(len(c.headSeedByMetricName)) - l := h % uint64(len(c.strippedLock)) + l := i % uint64(len(c.strippedLock)) c.strippedLock[l].Lock() defer c.strippedLock[l].Unlock() c.headSeedByMetricName[i]++ @@ -200,7 +204,7 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage. func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { h := MemHashString(metricName) i := h % uint64(len(c.headSeedByMetricName)) - l := h % uint64(len(c.strippedLock)) + l := i % uint64(len(c.strippedLock)) c.strippedLock[l].RLock() defer c.strippedLock[l].RUnlock() return strconv.Itoa(c.headSeedByMetricName[i]) @@ -276,13 +280,13 @@ type fifoCache[V any] struct { cachedBytes int64 } -func newFifoCache[V any](name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] { +func newFifoCache[V any](name string, metrics ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] { return &fifoCache[V]{ cachedValues: new(sync.Map), cached: list.New(), timeNow: timeNow, name: name, - metrics: *metrics, + metrics: metrics, ttl: 10 * time.Minute, maxBytes: int64(maxBytes), } diff --git a/pkg/receive/expandedpostingscache/cache_test.go b/pkg/receive/expandedpostingscache/cache_test.go index a2ba2dafd2..86c7573e99 100644 --- a/pkg/receive/expandedpostingscache/cache_test.go +++ b/pkg/receive/expandedpostingscache/cache_test.go @@ -15,9 +15,11 @@ import ( "time" "go.uber.org/atomic" + "golang.org/x/exp/rand" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" ) @@ -166,3 +168,43 @@ func repeatStringIfNeeded(seed string, length int) string { return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] } + +func TestLockRaceExpireSeries(t *testing.T) { + for j := 0; j < 10; j++ { + wg := &sync.WaitGroup{} + + c := NewBlocksPostingsForMatchersCache(ExpandedPostingsCacheMetrics{}, 1<<7, 1<<7, 3) + for i := 0; i < 1000; i++ { + wg.Add(2) + + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + c.ExpireSeries( + labels.FromMap(map[string]string{"__name__": randSeq(10)}), + ) + } + }() + + go func() { + defer wg.Done() + + for i := 0; i < 10; i++ { + c.getSeedForMetricName(randSeq(10)) + } + }() + } + wg.Wait() + } +} + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randSeq(n int) string { + b := make([]rune, n) + rand.Seed(uint64(time.Now().UnixNano())) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 1997e0fd38..10c41d32ba 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -709,7 +709,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant if t.headExpandedPostingsCacheSize > 0 || t.blockExpandedPostingsCacheSize > 0 { var expandedPostingsCacheMetrics = expandedpostingscache.NewPostingCacheMetrics(extprom.WrapRegistererWithPrefix("thanos_", reg)) - expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize) + expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize, 0) } opts := *t.tsdbOpts