Skip to content

Commit

Permalink
store/bucket: fix data race (#6575)
Browse files Browse the repository at this point in the history
* store/bucket: remove sort.Slice data race

The matchers slice is now sorted in each call but that introduces a data
race because the slice is shared between all calls. Do the sorting once
on the outermost function.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* store: add test for ExpandPostings() race

Signed-off-by: Giedrius Statkevičius <[email protected]>

---------

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored Aug 2, 2023
1 parent a339bdc commit eb80318
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 18 deletions.
42 changes: 27 additions & 15 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,24 @@ func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats {
return stats
}

type sortedMatchers []*labels.Matcher

func newSortedMatchers(matchers []*labels.Matcher) sortedMatchers {
sort.Slice(matchers, func(i, j int) bool {
if matchers[i].Type == matchers[j].Type {
if matchers[i].Name == matchers[j].Name {
return matchers[i].Value < matchers[j].Value
}
return matchers[i].Name < matchers[j].Name
}
return matchers[i].Type < matchers[j].Type
})

return matchers
}

func (b *blockSeriesClient) ExpandPostings(
matchers []*labels.Matcher,
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter)
Expand Down Expand Up @@ -1284,6 +1300,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
continue
}

sortedBlockMatchers := newSortedMatchers(blockMatchers)

blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

if s.debugLogging {
Expand Down Expand Up @@ -1326,7 +1344,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
"block.resolution": blk.meta.Thanos.Downsample.Resolution,
})

if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil {
if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter); err != nil {
span.Finish()
return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID)
}
Expand Down Expand Up @@ -1527,6 +1545,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
continue
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1581,7 +1601,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -1727,6 +1747,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels = append(reqSeriesMatchersNoExtLabels, m)
}

sortedReqSeriesMatchersNoExtLabels := newSortedMatchers(reqSeriesMatchersNoExtLabels)

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
Expand Down Expand Up @@ -1775,7 +1797,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
); err != nil {
return err
Expand Down Expand Up @@ -2196,23 +2218,13 @@ func (r *bucketIndexReader) reset() {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
// Shortcut the case of `len(postingGroups) == 0`. It will only happen when no
// matchers specified, and we don't need to fetch expanded postings from cache.
if len(ms) == 0 {
return nil, nil
}

// Sort matchers to make sure we generate the same cache key.
sort.Slice(ms, func(i, j int) bool {
if ms[i].Type == ms[j].Type {
if ms[i].Name == ms[j].Name {
return ms[i].Value < ms[j].Value
}
return ms[i].Name < ms[j].Name
}
return ms[i].Type < ms[j].Type
})
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
Expand Down
121 changes: 118 additions & 3 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil))
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p))
}
Expand Down Expand Up @@ -1261,7 +1261,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo")
// Match nothing.
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*")
ps, err := indexr.ExpandedPostings(context.Background(), []*labels.Matcher{matcher1, matcher2}, NewBytesLimiterFactory(0)(nil))
ps, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
testutil.Equals(t, len(ps), 0)
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -2562,6 +2562,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
// TODO FIXME! testutil.Ok calls b.Fatalf under the hood, which
// must be called only from the goroutine running the Benchmark function.
testutil.Ok(b, err)
sortedMatchers := newSortedMatchers(matchers)

dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{})
blockClient := newBlockSeriesClient(
Expand All @@ -2577,7 +2578,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
)
testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter))
testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter))
defer blockClient.Close()

// Ensure at least 1 series has been returned (as expected).
Expand Down Expand Up @@ -3049,3 +3050,117 @@ func TestPostingGroupMerge(t *testing.T) {
})
}
}

// TestExpandedPostings is a test whether there is a race between multiple ExpandPostings() calls.
func TestExpandedPostingsRace(t *testing.T) {
const blockCount = 10

tmpDir := t.TempDir()
t.Cleanup(func() {
testutil.Ok(t, os.RemoveAll(tmpDir))
})

bkt := objstore.NewInMemBucket()
t.Cleanup(func() {
testutil.Ok(t, bkt.Close())
})

// Create a block.
head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "head"),
SamplesPerSeries: 10,
ScrapeInterval: 15 * time.Second,
Series: 1000,
PrependLabels: nil,
Random: rand.New(rand.NewSource(120)),
SkipChunks: true,
})
blockID := createBlockFromHead(t, tmpDir, head)

bucketBlocks := make([]*bucketBlock, 0, blockCount)

for i := 0; i < blockCount; i++ {
ul := ulid.MustNew(uint64(i), rand.New(rand.NewSource(444)))

// Upload the block to the bucket.
thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: fmt.Sprintf("%d", i)}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}
m, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String()))
testutil.Ok(t, err)

m.Thanos = thanosMeta
m.BlockMeta.ULID = ul

e2eutil.Copy(t, filepath.Join(tmpDir, blockID.String()), filepath.Join(tmpDir, ul.String()))
testutil.Ok(t, m.WriteToDir(log.NewLogfmtLogger(os.Stderr), filepath.Join(tmpDir, ul.String())))
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), log.NewLogfmtLogger(os.Stderr), bkt, filepath.Join(tmpDir, ul.String()), metadata.NoneFunc))

r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, ul, DefaultPostingOffsetInMemorySampling)
testutil.Ok(t, err)

blk, err := newBucketBlock(
context.Background(),
log.NewLogfmtLogger(os.Stderr),
newBucketStoreMetrics(nil),
m,
bkt,
filepath.Join(tmpDir, ul.String()),
noopCache{},
nil,
r,
NewGapBasedPartitioner(PartitionerMaxGapSize),
nil,
nil,
)
testutil.Ok(t, err)

bucketBlocks = append(bucketBlocks, blk)
}

tm, cancel := context.WithTimeout(context.Background(), 40*time.Second)
t.Cleanup(cancel)

l := sync.Mutex{}
previousRefs := make(map[int][]storage.SeriesRef)

for {
if tm.Err() != nil {
break
}

m := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}

wg := &sync.WaitGroup{}
for i, bb := range bucketBlocks {
wg.Add(1)
i := i
bb := bb
go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
defer wg.Done()

l.Lock()
defer l.Unlock()
if previousRefs[i] != nil {
testutil.Equals(t, previousRefs[i], refs)
} else {
previousRefs[i] = refs
}
}(i, bb)
}
wg.Wait()
}
}

0 comments on commit eb80318

Please sign in to comment.