From 95ee758b486ec65adb488fbaa4d474901e396482 Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 9 Dec 2024 14:13:22 -0800 Subject: [PATCH] Fix race on the string interning Signed-off-by: alanprot --- go.mod | 2 +- pkg/ingester/ingester.go | 13 +- pkg/ingester/ingester_test.go | 64 ++++ pkg/util/strings.go | 73 +--- .../golang-lru/v2/expirable/expirable_lru.go | 338 ++++++++++++++++++ vendor/modules.txt | 1 + 6 files changed, 426 insertions(+), 65 deletions(-) create mode 100644 vendor/github.com/hashicorp/golang-lru/v2/expirable/expirable_lru.go diff --git a/go.mod b/go.mod index 6117fc029ac..040d4ce9772 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 github.com/cespare/xxhash/v2 v2.3.0 github.com/google/go-cmp v0.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/sercand/kuberesolver/v5 v5.1.1 go.opentelemetry.io/collector/pdata v1.21.0 golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 @@ -163,7 +164,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/golang-lru v0.6.0 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/jessevdk/go-flags v1.5.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2af8d254f32..d22e1f6dc26 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -440,6 +440,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error { return err } + if u.labelsStringInterningEnabled { + metric.InternStrings(u.interner.Intern) + } + return nil } @@ -454,9 +458,6 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { } u.seriesInMetric.increaseSeriesForMetric(metricName) u.labelSetCounter.increaseSeriesLabelSet(u, metric) - if u.labelsStringInterningEnabled { - metric.InternStrings(u.interner.Intern) - } if u.postingCache != nil { u.postingCache.ExpireSeries(metric) @@ -475,9 +476,6 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) } u.seriesInMetric.decreaseSeriesForMetric(metricName) u.labelSetCounter.decreaseSeriesLabelSet(u, metric) - if u.labelsStringInterningEnabled { - metric.ReleaseStrings(u.interner.Release) - } if u.postingCache != nil { u.postingCache.ExpireSeries(metric) } @@ -1233,7 +1231,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } else { // Copy the label set because both TSDB and the active series tracker may retain it. copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels) - // Retain the reference in case there are multiple samples for the series. if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil { succeededSamplesCount++ @@ -2201,7 +2198,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { instanceLimitsFn: i.getInstanceLimits, instanceSeriesCount: &i.TSDBState.seriesCount, - interner: util.NewInterner(), + interner: util.NewLruInterner(), labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled, blockRetentionPeriod: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(), diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index ab68877ce0e..3eb2be4bdfb 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -405,6 +405,70 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { } +func TestPushRace(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.LabelsStringInterningEnabled = true + cfg.LifecyclerConfig.JoinAfter = 0 + dir := t.TempDir() + blocksDir := filepath.Join(dir, "blocks") + + require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) + + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, blocksDir, prometheus.NewRegistry(), true) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() interface{} { + return ing.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), userID) + sample1 := cortexpb.Sample{ + TimestampMs: 0, + Value: 1, + } + + concurrentRequest := 200 + numberOfSeries := 200 + wg := sync.WaitGroup{} + wg.Add(numberOfSeries * concurrentRequest) + for k := 0; k < numberOfSeries; k++ { + for i := 0; i < concurrentRequest; i++ { + go func() { + defer wg.Done() + _, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API)) + require.NoError(t, err) + }() + } + } + + wg.Wait() + + db := ing.getTSDB(userID) + ir, err := db.db.Head().Index() + require.NoError(t, err) + + p, err := ir.Postings(ctx, "", "") + require.NoError(t, err) + p = ir.SortedPostings(p) + total := 0 + var builder labels.ScratchBuilder + + for p.Next() { + total++ + err = ir.Series(p.At(), &builder, nil) + require.NoError(t, err) + lbls := builder.Labels() + require.Equal(t, "foo", lbls.Get(labels.MetricName)) + require.Equal(t, "1", lbls.Get("userId")) + require.NotEmpty(t, lbls.Get("k")) + builder.Reset() + } + require.Equal(t, numberOfSeries, total) + require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries()) +} + func TestIngesterUserLimitExceeded(t *testing.T) { limits := defaultLimitsTestConfig() limits.MaxLocalSeriesPerUser = 1 diff --git a/pkg/util/strings.go b/pkg/util/strings.go index 69bb8f5fd7d..a61e622e101 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -3,10 +3,18 @@ package util import ( "context" "sync" + "time" "unsafe" "github.com/bboreham/go-loser" - "go.uber.org/atomic" + "github.com/hashicorp/golang-lru/v2/expirable" +) + +const ( + // Max size is ser to 2M. + maxInternerLruCacheSize = 2e6 + // TTL should be similar to the head compaction interval + internerLruCacheTTL = time.Hour * 2 ) // StringsContain returns true if the search value is within the list of input values. @@ -145,30 +153,18 @@ func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) { type Interner interface { Intern(s string) string - Release(s string) } -// NewInterner returns a new Interner to be used to intern strings. -// Based on https://github.com/prometheus/prometheus/blob/726ed124e4468d0274ba89b0934a6cc8c975532d/storage/remote/intern.go#L51 -func NewInterner() Interner { +// NewLruInterner returns a new Interner to be used to intern strings. +// The interner will use a LRU cache to return the deduplicated strings +func NewLruInterner() Interner { return &pool{ - pool: map[string]*entry{}, + lru: expirable.NewLRU[string, string](maxInternerLruCacheSize, nil, internerLruCacheTTL), } } type pool struct { - mtx sync.RWMutex - pool map[string]*entry -} - -type entry struct { - refs atomic.Int64 - - s string -} - -func newEntry(s string) *entry { - return &entry{s: s} + lru *expirable.LRU[string, string] } // Intern returns the interned string. It returns the canonical representation of string. @@ -177,45 +173,10 @@ func (p *pool) Intern(s string) string { return "" } - p.mtx.RLock() - interned, ok := p.pool[s] - p.mtx.RUnlock() + interned, ok := p.lru.Get(s) if ok { - interned.refs.Inc() - return interned.s + return interned } - p.mtx.Lock() - defer p.mtx.Unlock() - if interned, ok := p.pool[s]; ok { - interned.refs.Inc() - return interned.s - } - - p.pool[s] = newEntry(s) - p.pool[s].refs.Store(1) + p.lru.Add(s, s) return s } - -// Release releases a reference of the string `s`. -// If the reference count become 0, the string `s` is removed from the memory -func (p *pool) Release(s string) { - p.mtx.RLock() - interned, ok := p.pool[s] - p.mtx.RUnlock() - - if !ok { - return - } - - refs := interned.refs.Dec() - if refs > 0 { - return - } - - p.mtx.Lock() - defer p.mtx.Unlock() - if interned.refs.Load() != 0 { - return - } - delete(p.pool, s) -} diff --git a/vendor/github.com/hashicorp/golang-lru/v2/expirable/expirable_lru.go b/vendor/github.com/hashicorp/golang-lru/v2/expirable/expirable_lru.go new file mode 100644 index 00000000000..89978d6d239 --- /dev/null +++ b/vendor/github.com/hashicorp/golang-lru/v2/expirable/expirable_lru.go @@ -0,0 +1,338 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package expirable + +import ( + "sync" + "time" + + "github.com/hashicorp/golang-lru/v2/internal" +) + +// EvictCallback is used to get a callback when a cache entry is evicted +type EvictCallback[K comparable, V any] func(key K, value V) + +// LRU implements a thread-safe LRU with expirable entries. +type LRU[K comparable, V any] struct { + size int + evictList *internal.LruList[K, V] + items map[K]*internal.Entry[K, V] + onEvict EvictCallback[K, V] + + // expirable options + mu sync.Mutex + ttl time.Duration + done chan struct{} + + // buckets for expiration + buckets []bucket[K, V] + // uint8 because it's number between 0 and numBuckets + nextCleanupBucket uint8 +} + +// bucket is a container for holding entries to be expired +type bucket[K comparable, V any] struct { + entries map[K]*internal.Entry[K, V] + newestEntry time.Time +} + +// noEvictionTTL - very long ttl to prevent eviction +const noEvictionTTL = time.Hour * 24 * 365 * 10 + +// because of uint8 usage for nextCleanupBucket, should not exceed 256. +// casting it as uint8 explicitly requires type conversions in multiple places +const numBuckets = 100 + +// NewLRU returns a new thread-safe cache with expirable entries. +// +// Size parameter set to 0 makes cache of unlimited size, e.g. turns LRU mechanism off. +// +// Providing 0 TTL turns expiring off. +// +// Delete expired entries every 1/100th of ttl value. Goroutine which deletes expired entries runs indefinitely. +func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V], ttl time.Duration) *LRU[K, V] { + if size < 0 { + size = 0 + } + if ttl <= 0 { + ttl = noEvictionTTL + } + + res := LRU[K, V]{ + ttl: ttl, + size: size, + evictList: internal.NewList[K, V](), + items: make(map[K]*internal.Entry[K, V]), + onEvict: onEvict, + done: make(chan struct{}), + } + + // initialize the buckets + res.buckets = make([]bucket[K, V], numBuckets) + for i := 0; i < numBuckets; i++ { + res.buckets[i] = bucket[K, V]{entries: make(map[K]*internal.Entry[K, V])} + } + + // enable deleteExpired() running in separate goroutine for cache with non-zero TTL + // + // Important: done channel is never closed, so deleteExpired() goroutine will never exit, + // it's decided to add functionality to close it in the version later than v2. + if res.ttl != noEvictionTTL { + go func(done <-chan struct{}) { + ticker := time.NewTicker(res.ttl / numBuckets) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + res.deleteExpired() + } + } + }(res.done) + } + return &res +} + +// Purge clears the cache completely. +// onEvict is called for each evicted key. +func (c *LRU[K, V]) Purge() { + c.mu.Lock() + defer c.mu.Unlock() + for k, v := range c.items { + if c.onEvict != nil { + c.onEvict(k, v.Value) + } + delete(c.items, k) + } + for _, b := range c.buckets { + for _, ent := range b.entries { + delete(b.entries, ent.Key) + } + } + c.evictList.Init() +} + +// Add adds a value to the cache. Returns true if an eviction occurred. +// Returns false if there was no eviction: the item was already in the cache, +// or the size was not exceeded. +func (c *LRU[K, V]) Add(key K, value V) (evicted bool) { + c.mu.Lock() + defer c.mu.Unlock() + now := time.Now() + + // Check for existing item + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + c.removeFromBucket(ent) // remove the entry from its current bucket as expiresAt is renewed + ent.Value = value + ent.ExpiresAt = now.Add(c.ttl) + c.addToBucket(ent) + return false + } + + // Add new item + ent := c.evictList.PushFrontExpirable(key, value, now.Add(c.ttl)) + c.items[key] = ent + c.addToBucket(ent) // adds the entry to the appropriate bucket and sets entry.expireBucket + + evict := c.size > 0 && c.evictList.Length() > c.size + // Verify size not exceeded + if evict { + c.removeOldest() + } + return evict +} + +// Get looks up a key's value from the cache. +func (c *LRU[K, V]) Get(key K) (value V, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + var ent *internal.Entry[K, V] + if ent, ok = c.items[key]; ok { + // Expired item check + if time.Now().After(ent.ExpiresAt) { + return value, false + } + c.evictList.MoveToFront(ent) + return ent.Value, true + } + return +} + +// Contains checks if a key is in the cache, without updating the recent-ness +// or deleting it for being stale. +func (c *LRU[K, V]) Contains(key K) (ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + _, ok = c.items[key] + return ok +} + +// Peek returns the key value (or undefined if not found) without updating +// the "recently used"-ness of the key. +func (c *LRU[K, V]) Peek(key K) (value V, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + var ent *internal.Entry[K, V] + if ent, ok = c.items[key]; ok { + // Expired item check + if time.Now().After(ent.ExpiresAt) { + return value, false + } + return ent.Value, true + } + return +} + +// Remove removes the provided key from the cache, returning if the +// key was contained. +func (c *LRU[K, V]) Remove(key K) bool { + c.mu.Lock() + defer c.mu.Unlock() + if ent, ok := c.items[key]; ok { + c.removeElement(ent) + return true + } + return false +} + +// RemoveOldest removes the oldest item from the cache. +func (c *LRU[K, V]) RemoveOldest() (key K, value V, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + if ent := c.evictList.Back(); ent != nil { + c.removeElement(ent) + return ent.Key, ent.Value, true + } + return +} + +// GetOldest returns the oldest entry +func (c *LRU[K, V]) GetOldest() (key K, value V, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + if ent := c.evictList.Back(); ent != nil { + return ent.Key, ent.Value, true + } + return +} + +// Keys returns a slice of the keys in the cache, from oldest to newest. +func (c *LRU[K, V]) Keys() []K { + c.mu.Lock() + defer c.mu.Unlock() + keys := make([]K, 0, len(c.items)) + for ent := c.evictList.Back(); ent != nil; ent = ent.PrevEntry() { + keys = append(keys, ent.Key) + } + return keys +} + +// Values returns a slice of the values in the cache, from oldest to newest. +// Expired entries are filtered out. +func (c *LRU[K, V]) Values() []V { + c.mu.Lock() + defer c.mu.Unlock() + values := make([]V, len(c.items)) + i := 0 + now := time.Now() + for ent := c.evictList.Back(); ent != nil; ent = ent.PrevEntry() { + if now.After(ent.ExpiresAt) { + continue + } + values[i] = ent.Value + i++ + } + return values +} + +// Len returns the number of items in the cache. +func (c *LRU[K, V]) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.evictList.Length() +} + +// Resize changes the cache size. Size of 0 means unlimited. +func (c *LRU[K, V]) Resize(size int) (evicted int) { + c.mu.Lock() + defer c.mu.Unlock() + if size <= 0 { + c.size = 0 + return 0 + } + diff := c.evictList.Length() - size + if diff < 0 { + diff = 0 + } + for i := 0; i < diff; i++ { + c.removeOldest() + } + c.size = size + return diff +} + +// Close destroys cleanup goroutine. To clean up the cache, run Purge() before Close(). +// func (c *LRU[K, V]) Close() { +// c.mu.Lock() +// defer c.mu.Unlock() +// select { +// case <-c.done: +// return +// default: +// } +// close(c.done) +// } + +// removeOldest removes the oldest item from the cache. Has to be called with lock! +func (c *LRU[K, V]) removeOldest() { + if ent := c.evictList.Back(); ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache. Has to be called with lock! +func (c *LRU[K, V]) removeElement(e *internal.Entry[K, V]) { + c.evictList.Remove(e) + delete(c.items, e.Key) + c.removeFromBucket(e) + if c.onEvict != nil { + c.onEvict(e.Key, e.Value) + } +} + +// deleteExpired deletes expired records from the oldest bucket, waiting for the newest entry +// in it to expire first. +func (c *LRU[K, V]) deleteExpired() { + c.mu.Lock() + bucketIdx := c.nextCleanupBucket + timeToExpire := time.Until(c.buckets[bucketIdx].newestEntry) + // wait for newest entry to expire before cleanup without holding lock + if timeToExpire > 0 { + c.mu.Unlock() + time.Sleep(timeToExpire) + c.mu.Lock() + } + for _, ent := range c.buckets[bucketIdx].entries { + c.removeElement(ent) + } + c.nextCleanupBucket = (c.nextCleanupBucket + 1) % numBuckets + c.mu.Unlock() +} + +// addToBucket adds entry to expire bucket so that it will be cleaned up when the time comes. Has to be called with lock! +func (c *LRU[K, V]) addToBucket(e *internal.Entry[K, V]) { + bucketID := (numBuckets + c.nextCleanupBucket - 1) % numBuckets + e.ExpireBucket = bucketID + c.buckets[bucketID].entries[e.Key] = e + if c.buckets[bucketID].newestEntry.Before(e.ExpiresAt) { + c.buckets[bucketID].newestEntry = e.ExpiresAt + } +} + +// removeFromBucket removes the entry from its corresponding bucket. Has to be called with lock! +func (c *LRU[K, V]) removeFromBucket(e *internal.Entry[K, V]) { + delete(c.buckets[e.ExpireBucket].entries, e.Key) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b8fd864e39a..57fbe5e6d52 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -589,6 +589,7 @@ github.com/hashicorp/golang-lru/simplelru # github.com/hashicorp/golang-lru/v2 v2.0.7 ## explicit; go 1.18 github.com/hashicorp/golang-lru/v2 +github.com/hashicorp/golang-lru/v2/expirable github.com/hashicorp/golang-lru/v2/internal github.com/hashicorp/golang-lru/v2/simplelru # github.com/hashicorp/memberlist v0.5.1 => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe