Skip to content

Commit

Permalink
Store-gateway: allow postings scans to be interrupted (#8154)
Browse files Browse the repository at this point in the history
* Store-gateway: allow postings scans to be interrupted

If the filter operation is expensive, e.g. a large regexp, and the
posting has a lot of values, filtering can many seconds.
Check the context for cancellation every 1024 values.

Signed-off-by: Bryan Boreham <[email protected]>
Signed-off-by: György Krajcsovits <[email protected]>
Co-authored-by: György Krajcsovits <[email protected]>
Co-authored-by: George Krajcsovits <[email protected]>
  • Loading branch information
3 people authored May 16, 2024
1 parent 1141091 commit 04e6b7f
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [BUGFIX] Querying: Fix regex matching of multibyte runes with dot operator. #8089
* [BUGFIX] Querying: matrix results returned from instant queries were not sorted by series. #8113
* [BUGFIX] Query scheduler: Fix a crash in result marshaling. #8140
* [BUGFIX] Store-gateway: Allow long-running index scans to be interrupted. #8154

### Mixin

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1639,7 +1639,7 @@ func blockLabelValues(ctx context.Context, b *bucketBlock, postingsStrategy post
}

// TODO: if matchers contains labelName, we could use it to filter out label values here.
allValuesPostingOffsets, err := b.indexHeaderReader.LabelValuesOffsets(labelName, "", nil)
allValuesPostingOffsets, err := b.indexHeaderReader.LabelValuesOffsets(ctx, labelName, "", nil)
if err != nil {
return nil, errors.Wrap(err, "index header label values")
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/storegateway/bucket_index_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package storegateway

import (
"context"
"encoding/binary"
"fmt"
"sort"
Expand Down Expand Up @@ -80,7 +81,7 @@ func newLazySubtractingPostingGroup(m *labels.Matcher) rawPostingGroup {

// toPostingGroup returns a postingGroup which shares the underlying keys slice with g.
// This means that after calling toPostingGroup g.keys will be modified.
func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, error) {
func (g rawPostingGroup) toPostingGroup(ctx context.Context, r indexheader.Reader) (postingGroup, error) {
var (
keys []labels.Label
totalSize int64
Expand All @@ -90,7 +91,7 @@ func (g rawPostingGroup) toPostingGroup(r indexheader.Reader) (postingGroup, err
if g.isSubtract {
filter = not(filter)
}
vals, err := r.LabelValuesOffsets(g.labelName, g.prefix, filter)
vals, err := r.LabelValuesOffsets(ctx, g.labelName, g.prefix, filter)
if err != nil {
return postingGroup{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/bucket_index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (r *bucketIndexReader) fetchCachedExpandedPostings(ctx context.Context, use

// expandedPostings is the main logic of ExpandedPostings, without the promise wrapper.
func (r *bucketIndexReader) expandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, pendingMatchers []*labels.Matcher, returnErr error) {
postingGroups, err := toPostingGroups(ms, r.block.indexHeaderReader)
postingGroups, err := toPostingGroups(ctx, ms, r.block.indexHeaderReader)
if err != nil {
return nil, nil, errors.Wrap(err, "toPostingGroups")
}
Expand Down Expand Up @@ -314,7 +314,7 @@ var allPostingsKey = func() labels.Label {

// toPostingGroups returns a set of labels for which to look up postings lists. It guarantees that
// each postingGroup's keys exist in the index.
func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]postingGroup, error) {
func toPostingGroups(ctx context.Context, ms []*labels.Matcher, indexhdr indexheader.Reader) ([]postingGroup, error) {
var (
rawPostingGroups = make([]rawPostingGroup, 0, len(ms))
allRequested = false
Expand Down Expand Up @@ -354,7 +354,7 @@ func toPostingGroups(ms []*labels.Matcher, indexhdr indexheader.Reader) ([]posti
// Based on the previous sorting, we start with the ones that have a known set of values because it's less expensive to check them in
// the index header.
for _, rawGroup := range rawPostingGroups {
pg, err := rawGroup.toPostingGroup(indexhdr)
pg, err := rawGroup.toPostingGroup(ctx, indexhdr)
if err != nil {
return nil, errors.Wrap(err, "filtering posting group")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,13 +885,13 @@ func (iir *interceptedIndexReader) LabelNames() ([]string, error) {
return iir.Reader.LabelNames()
}

func (iir *interceptedIndexReader) LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]index.PostingListOffset, error) {
func (iir *interceptedIndexReader) LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]index.PostingListOffset, error) {
if iir.onLabelValuesOffsetsCalled != nil {
if err := iir.onLabelValuesOffsetsCalled(name); err != nil {
return nil, err
}
}
return iir.Reader.LabelValuesOffsets(name, prefix, filter)
return iir.Reader.LabelValuesOffsets(ctx, name, prefix, filter)
}

func (iir *interceptedIndexReader) IndexVersion() (int, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package indexheader

import (
"context"
"flag"
"io"
"time"
Expand Down Expand Up @@ -55,7 +56,7 @@ type Reader interface {
// then empty slice is returned and no error.
// If non-empty prefix is provided, only posting lists starting with the prefix are returned.
// If non-nil filter is provided, then only posting lists for which filter returns true are returned.
LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error)
LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error)

// LabelNames returns all label names in sorted order.
LabelNames() ([]string, error)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe
expectedLabelVals, err := indexReader.SortedLabelValues(ctx, lname)
require.NoError(t, err)

valOffsets, err := headerReader.LabelValuesOffsets(lname, "", nil)
valOffsets, err := headerReader.LabelValuesOffsets(ctx, lname, "", nil)
require.NoError(t, err)
strValsFromOffsets := make([]string, len(valOffsets))
for i := range valOffsets {
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestReadersLabelValuesOffsets(t *testing.T) {
t.Run(lbl, func(t *testing.T) {
for _, tc := range tcs {
t.Run(fmt.Sprintf("prefix='%s'%s", tc.prefix, tc.desc), func(t *testing.T) {
values, err := r.LabelValuesOffsets(lbl, tc.prefix, tc.filter)
values, err := r.LabelValuesOffsets(context.Background(), lbl, tc.prefix, tc.filter)
require.NoError(t, err)
require.Equal(t, tc.expected, len(values))
})
Expand Down
23 changes: 19 additions & 4 deletions pkg/storegateway/indexheader/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package index

import (
"context"
"fmt"
"hash/crc32"
"sort"
Expand All @@ -20,7 +21,11 @@ import (
"github.com/grafana/mimir/pkg/storegateway/indexheader/indexheaderpb"
)

const postingLengthFieldSize = 4
const (
postingLengthFieldSize = 4
// CheckContextEveryNIterations is used in some tight loops to check if the context is done.
CheckContextEveryNIterations = 1024
)

type PostingOffsetTable interface {
// PostingsOffset returns the byte range of the postings section for the label with the given name and value.
Expand All @@ -32,7 +37,7 @@ type PostingOffsetTable interface {
// LabelValuesOffsets returns all postings lists for the label named name that match filter and have the prefix provided.
// The ranges of each posting list are the same as returned by PostingsOffset.
// The returned label values are sorted lexicographically (which the same as sorted by posting offset).
LabelValuesOffsets(name, prefix string, filter func(string) bool) ([]PostingListOffset, error)
LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) ([]PostingListOffset, error)

// LabelNames returns a sorted list of all label names in this table.
LabelNames() ([]string, error)
Expand Down Expand Up @@ -286,13 +291,18 @@ func (t *PostingOffsetTableV1) PostingsOffset(name string, value string) (index.
return rng, true, nil
}

func (t *PostingOffsetTableV1) LabelValuesOffsets(name, prefix string, filter func(string) bool) ([]PostingListOffset, error) {
func (t *PostingOffsetTableV1) LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) ([]PostingListOffset, error) {
e, ok := t.postings[name]
if !ok {
return nil, nil
}
values := make([]PostingListOffset, 0, len(e))
count := 1
for k, r := range e {
if count%CheckContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if strings.HasPrefix(k, prefix) && (filter == nil || filter(k)) {
values = append(values, PostingListOffset{LabelValue: k, Off: r})
}
Expand Down Expand Up @@ -468,7 +478,7 @@ func (t *PostingOffsetTableV2) PostingsOffset(name string, value string) (r inde
return index.Range{}, false, nil
}

func (t *PostingOffsetTableV2) LabelValuesOffsets(name, prefix string, filter func(string) bool) (_ []PostingListOffset, err error) {
func (t *PostingOffsetTableV2) LabelValuesOffsets(ctx context.Context, name, prefix string, filter func(string) bool) (_ []PostingListOffset, err error) {
e, ok := t.postings[name]
if !ok {
return nil, nil
Expand Down Expand Up @@ -546,7 +556,12 @@ func (t *PostingOffsetTableV2) LabelValuesOffsets(name, prefix string, filter fu
nextEntry pEntry
)

count := 1
for d.Err() == nil && !currEntry.isLast {
if count%CheckContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
// Populate the current list either reading it from the pre-populated "next" or reading it from the index.
if nextEntry != (pEntry{}) {
currEntry = nextEntry
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ func (r *LazyBinaryReader) SymbolsReader() (streamindex.SymbolsReader, error) {
}

// LabelValuesOffsets implements Reader.
func (r *LazyBinaryReader) LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) {
func (r *LazyBinaryReader) LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) {
reader, wg, err := r.getOrLoadReader()
if err != nil {
return nil, err
}
defer wg.Done()

return reader.LabelValuesOffsets(name, prefix, filter)
return reader.LabelValuesOffsets(ctx, name, prefix, filter)
}

// LabelNames implements Reader.
Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/indexheader/reader_benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func BenchmarkLabelValuesOffsetsIndexV1(b *testing.B) {
for i := 0; i < b.N; i++ {
name := names[i%len(names)]

values, err := br.LabelValuesOffsets(name, "", func(string) bool {
values, err := br.LabelValuesOffsets(ctx, name, "", func(string) bool {
return true
})

Expand Down Expand Up @@ -221,7 +221,7 @@ func BenchmarkLabelValuesOffsetsIndexV2(b *testing.B) {
for i := 0; i < b.N; i++ {
name := names[i%len(names)]

values, err := br.LabelValuesOffsets(name, "", func(string) bool {
values, err := br.LabelValuesOffsets(ctx, name, "", func(string) bool {
return true
})

Expand All @@ -241,7 +241,7 @@ func BenchmarkLabelValuesOffsetsIndexV2_WithPrefix(b *testing.B) {
for _, tc := range tcs {
b.Run(fmt.Sprintf("prefix='%s'%s", tc.prefix, tc.desc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
values, err := r.LabelValuesOffsets(lbl, tc.prefix, tc.filter)
values, err := r.LabelValuesOffsets(context.Background(), lbl, tc.prefix, tc.filter)
require.NoError(b, err)
require.Equal(b, tc.expected, len(values))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/indexheader/stream_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ func (r *StreamBinaryReader) SymbolsReader() (streamindex.SymbolsReader, error)
}, nil
}

func (r *StreamBinaryReader) LabelValuesOffsets(name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) {
return r.postingsOffsetTable.LabelValuesOffsets(name, prefix, filter)
func (r *StreamBinaryReader) LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) {
return r.postingsOffsetTable.LabelValuesOffsets(ctx, name, prefix, filter)
}

func (r *StreamBinaryReader) LabelNames() ([]string, error) {
Expand Down
33 changes: 33 additions & 0 deletions pkg/storegateway/indexheader/stream_binary_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/fileutil"
promtestutil "github.com/prometheus/prometheus/util/testutil"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/grafana/mimir/pkg/storage/tsdb/block"
streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -93,3 +95,34 @@ func TestStreamBinaryReader_CheckSparseHeadersCorrectnessExtensive(t *testing.T)
}
}
}

func TestStreamBinaryReader_LabelValuesOffsetsHonorsContextCancel(t *testing.T) {
ctx := context.Background()

tmpDir := filepath.Join(t.TempDir(), "test-stream-binary-reader-cancel")
bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, bkt.Close()) })

seriesCount := streamindex.CheckContextEveryNIterations * 10
// Create block.
lbls := make([]labels.Labels, 0, seriesCount)
for i := 0; i < seriesCount; i++ {
lbls = append(lbls, labels.FromStrings("a", fmt.Sprintf("%d", i)))
}
blockID, err := block.CreateBlock(ctx, tmpDir, lbls, 1, 0, 10, labels.FromStrings("ext1", "1"))
require.NoError(t, err)
require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), nil))

// Write sparse index headers to disk on first build.
r, err := NewStreamBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, NewStreamBinaryReaderMetrics(nil), Config{})
require.NoError(t, err)

// LabelValuesOffsets will read all series and check for cancelation every CheckContextEveryNIterations,
// we set ctx to fail after half of the series are read.
failAfter := uint64(seriesCount / 2 / streamindex.CheckContextEveryNIterations)
ctx = &promtestutil.MockContextErrAfter{FailAfter: failAfter}
_, err = r.LabelValuesOffsets(ctx, "a", "", func(string) bool { return true })
require.Error(t, err)
require.ErrorIs(t, err, context.Canceled)
}

0 comments on commit 04e6b7f

Please sign in to comment.