Skip to content

Commit

Permalink
Fix race on the string interning
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Dec 9, 2024
1 parent c9e3d5e commit 95ee758
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 65 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/go-cmp v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/sercand/kuberesolver/v5 v5.1.1
go.opentelemetry.io/collector/pdata v1.21.0
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
Expand Down Expand Up @@ -163,7 +164,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
13 changes: 5 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return err
}

if u.labelsStringInterningEnabled {
metric.InternStrings(u.interner.Intern)
}

return nil
}

Expand All @@ -454,9 +458,6 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
}
u.seriesInMetric.increaseSeriesForMetric(metricName)
u.labelSetCounter.increaseSeriesLabelSet(u, metric)
if u.labelsStringInterningEnabled {
metric.InternStrings(u.interner.Intern)
}

if u.postingCache != nil {
u.postingCache.ExpireSeries(metric)
Expand All @@ -475,9 +476,6 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
}
u.seriesInMetric.decreaseSeriesForMetric(metricName)
u.labelSetCounter.decreaseSeriesLabelSet(u, metric)
if u.labelsStringInterningEnabled {
metric.ReleaseStrings(u.interner.Release)
}
if u.postingCache != nil {
u.postingCache.ExpireSeries(metric)
}
Expand Down Expand Up @@ -1233,7 +1231,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
} else {
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)

// Retain the reference in case there are multiple samples for the series.
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
succeededSamplesCount++
Expand Down Expand Up @@ -2201,7 +2198,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {

instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.TSDBState.seriesCount,
interner: util.NewInterner(),
interner: util.NewLruInterner(),
labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled,

blockRetentionPeriod: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(),
Expand Down
64 changes: 64 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,70 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {

}

func TestPushRace(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.LabelsStringInterningEnabled = true
cfg.LifecyclerConfig.JoinAfter = 0
dir := t.TempDir()
blocksDir := filepath.Join(dir, "blocks")

require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))

ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, blocksDir, prometheus.NewRegistry(), true)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), userID)
sample1 := cortexpb.Sample{
TimestampMs: 0,
Value: 1,
}

concurrentRequest := 200
numberOfSeries := 200
wg := sync.WaitGroup{}
wg.Add(numberOfSeries * concurrentRequest)
for k := 0; k < numberOfSeries; k++ {
for i := 0; i < concurrentRequest; i++ {
go func() {
defer wg.Done()
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}
}

wg.Wait()

db := ing.getTSDB(userID)
ir, err := db.db.Head().Index()
require.NoError(t, err)

p, err := ir.Postings(ctx, "", "")
require.NoError(t, err)
p = ir.SortedPostings(p)
total := 0
var builder labels.ScratchBuilder

for p.Next() {
total++
err = ir.Series(p.At(), &builder, nil)
require.NoError(t, err)
lbls := builder.Labels()
require.Equal(t, "foo", lbls.Get(labels.MetricName))
require.Equal(t, "1", lbls.Get("userId"))
require.NotEmpty(t, lbls.Get("k"))
builder.Reset()
}
require.Equal(t, numberOfSeries, total)
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
}

func TestIngesterUserLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.MaxLocalSeriesPerUser = 1
Expand Down
73 changes: 17 additions & 56 deletions pkg/util/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@ package util
import (
"context"
"sync"
"time"
"unsafe"

"github.com/bboreham/go-loser"
"go.uber.org/atomic"
"github.com/hashicorp/golang-lru/v2/expirable"
)

const (
// Max size is ser to 2M.
maxInternerLruCacheSize = 2e6
// TTL should be similar to the head compaction interval
internerLruCacheTTL = time.Hour * 2
)

// StringsContain returns true if the search value is within the list of input values.
Expand Down Expand Up @@ -145,30 +153,18 @@ func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) {

type Interner interface {
Intern(s string) string
Release(s string)
}

// NewInterner returns a new Interner to be used to intern strings.
// Based on https://github.com/prometheus/prometheus/blob/726ed124e4468d0274ba89b0934a6cc8c975532d/storage/remote/intern.go#L51
func NewInterner() Interner {
// NewLruInterner returns a new Interner to be used to intern strings.
// The interner will use a LRU cache to return the deduplicated strings
func NewLruInterner() Interner {
return &pool{
pool: map[string]*entry{},
lru: expirable.NewLRU[string, string](maxInternerLruCacheSize, nil, internerLruCacheTTL),
}
}

type pool struct {
mtx sync.RWMutex
pool map[string]*entry
}

type entry struct {
refs atomic.Int64

s string
}

func newEntry(s string) *entry {
return &entry{s: s}
lru *expirable.LRU[string, string]
}

// Intern returns the interned string. It returns the canonical representation of string.
Expand All @@ -177,45 +173,10 @@ func (p *pool) Intern(s string) string {
return ""
}

p.mtx.RLock()
interned, ok := p.pool[s]
p.mtx.RUnlock()
interned, ok := p.lru.Get(s)
if ok {
interned.refs.Inc()
return interned.s
return interned
}
p.mtx.Lock()
defer p.mtx.Unlock()
if interned, ok := p.pool[s]; ok {
interned.refs.Inc()
return interned.s
}

p.pool[s] = newEntry(s)
p.pool[s].refs.Store(1)
p.lru.Add(s, s)
return s
}

// Release releases a reference of the string `s`.
// If the reference count become 0, the string `s` is removed from the memory
func (p *pool) Release(s string) {
p.mtx.RLock()
interned, ok := p.pool[s]
p.mtx.RUnlock()

if !ok {
return
}

refs := interned.refs.Dec()
if refs > 0 {
return
}

p.mtx.Lock()
defer p.mtx.Unlock()
if interned.refs.Load() != 0 {
return
}
delete(p.pool, s)
}
Loading

0 comments on commit 95ee758

Please sign in to comment.