Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize postings fetching by checking postings and series size #6465

Merged
merged 6 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type storeConfig struct {
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity.").
Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout)

cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb)

cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").
Expand Down Expand Up @@ -382,6 +386,7 @@ func runStore(
}
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
}

if conf.debugLogging {
Expand Down
5 changes: 5 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ Flags:
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--store.enable-lazy-expanded-postings
If true, Store Gateway will estimate postings
size and try to lazily expand postings if
it downloads less data than expanding all
postings.
--store.grpc.downloaded-bytes-limit=0
Maximum amount of downloaded (either
fetched or touched) bytes in a single
Expand Down
10 changes: 6 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))

// File stats are gathered.
testutil.Equals(t, fmt.Sprintf(`{
Expand Down Expand Up @@ -184,7 +184,9 @@ func TestUpload(t *testing.T) {
"rel_path": "meta.json"
}
],
"index_stats": {}
"index_stats": {
"series_max_size": 16
}
}
}
`, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand All @@ -195,7 +197,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 595, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
Expand Down Expand Up @@ -227,7 +229,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
testutil.Equals(t, 574, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
postingLengthFieldSize = 4
)

var NotFoundRange = index.Range{Start: -1, End: -1}

// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
Expand Down Expand Up @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) {
return r.indexVersion, nil
}

// PostingsOffsets implements Reader.
func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) {
return r.postingsOffset(name, values...)
}

// TODO(bwplotka): Get advantage of multi value offset fetch.
func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) {
rngs, err := r.postingsOffset(name, value)
if err != nil {
return index.Range{}, err
}
if len(rngs) != 1 {
if len(rngs) != 1 || rngs[0] == NotFoundRange {
return index.Range{}, NotFoundRangeErr
}
return rngs[0], nil
Expand Down Expand Up @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
rngs = append(rngs, NotFoundRange)
valueIndex++
}

Expand All @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue })
if i == len(e.offsets) {
// We're past the end.
for len(rngs) < len(values) {
rngs = append(rngs, NotFoundRange)
}
break
}
if i > 0 && e.offsets[i].value != wantedValue {
Expand Down Expand Up @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
// Record on the way if wanted value is equal to the current value.
if string(value) == wantedValue {
newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
} else {
rngs = append(rngs, NotFoundRange)
}
valueIndex++
if valueIndex == len(values) {
Expand All @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
}

if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value {
// Increment i when wanted value is same as next offset.
if wantedValue == e.offsets[i+1].value {
i++
}
// wantedValue is smaller or same as the next offset we know about, let's iterate further to add those.
continue
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/block/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ type Reader interface {
// IndexVersion returns version of index.
IndexVersion() (int, error)

// PostingsOffsets returns start and end offsets for postings for given name and values.
// Input values need to be sorted.
// If the requested label name doesn't exist, then no posting and error will be returned.
// If the requested label name exists, but some values don't exist, the corresponding index range
// will be set to -1 for both start and end.
PostingsOffsets(name string, value ...string) ([]index.Range, error)

// PostingsOffset returns start and end offsets of postings for given name and value.
// The end offset might be bigger than the actual posting ending, but not larger than the whole index file.
// NotFoundRangeErr is returned when no index can be found for given name and value.
// TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark.
PostingsOffset(name string, value string) (index.Range, error)

// LookupSymbol returns string based on given reference.
Expand Down
32 changes: 32 additions & 0 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,38 @@ func TestReaders(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

// single value
rngs, err := br.PostingsOffsets("a", "9")
testutil.Ok(t, err)
for _, rng := range rngs {
testutil.Assert(t, rng.End > rng.Start)
}

rngs, err = br.PostingsOffsets("a", "2", "3", "4", "5", "6", "7", "8", "9")
testutil.Ok(t, err)
for _, rng := range rngs {
testutil.Assert(t, rng.End > rng.Start)
}

rngs, err = br.PostingsOffsets("a", "0")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 1)
testutil.Equals(t, NotFoundRange, rngs[0])

rngs, err = br.PostingsOffsets("a", "0", "10", "99")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 3)
for _, rng := range rngs {
testutil.Equals(t, NotFoundRange, rng)
}

rngs, err = br.PostingsOffsets("a", "1", "10", "9")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 3)
testutil.Assert(t, rngs[0].End > rngs[0].Start)
testutil.Assert(t, rngs[2].End > rngs[2].Start)
testutil.Equals(t, NotFoundRange, rngs[1])

// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
// Most of not existing value was working despite bug, except in certain unlucky cases
// it was causing "invalid size" errors.
Expand Down
13 changes: 13 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) {
return r.reader.IndexVersion()
}

// PostingsOffsets implements Reader.
func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()

if err := r.load(); err != nil {
return nil, err
}

r.usedAt.Store(time.Now().UnixNano())
return r.reader.PostingsOffsets(name, values...)
}

// PostingsOffset implements Reader.
func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) {
r.readerMx.RLock()
Expand Down
148 changes: 78 additions & 70 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,78 +722,86 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
func TestBucketStore_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")

testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) })

headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, h.Close()) })
logger := log.NewNopLogger()

appendFn(h.Appender(context.Background()))

if h.NumSeries() == 0 {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)
for _, lazyExpandedPosting := range []bool{false, true} {
testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")

testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) })

headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, h.Close()) })
logger := log.NewNopLogger()

appendFn(h.Appender(context.Background()))

if h.NumSeries() == 0 {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc))

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)

metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(tt, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(10e6),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
WithLazyExpandedPostings(lazyExpandedPosting),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
})
testutil.Ok(tt, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(10e6),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
})
}
}

func TestPrometheusStore_Acceptance(t *testing.T) {
Expand Down
Loading