Skip to content

Commit

Permalink
optimize postings fetching by checking postings and series size
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jul 21, 2023
1 parent cdba35b commit 294288f
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 49 deletions.
5 changes: 5 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type storeConfig struct {
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity.").
Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout)

cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb)

cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").
Expand Down Expand Up @@ -382,6 +386,7 @@ func runStore(
}
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
}

if conf.debugLogging {
Expand Down
19 changes: 18 additions & 1 deletion pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
postingLengthFieldSize = 4
)

var notFoundRange = index.Range{Start: -1, End: -1}

// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
Expand Down Expand Up @@ -747,13 +749,18 @@ func (r *BinaryReader) IndexVersion() (int, error) {
return r.indexVersion, nil
}

// PostingsOffsets implements Reader.
func (r *BinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) {
return r.postingsOffset(name, values...)
}

// TODO(bwplotka): Get advantage of multi value offset fetch.
func (r *BinaryReader) PostingsOffset(name, value string) (index.Range, error) {
rngs, err := r.postingsOffset(name, value)
if err != nil {
return index.Range{}, err
}
if len(rngs) != 1 {
if len(rngs) != 1 || rngs[0] == notFoundRange {
return index.Range{}, NotFoundRangeErr
}
return rngs[0], nil
Expand Down Expand Up @@ -801,6 +808,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
rngs = append(rngs, notFoundRange)
valueIndex++
}

Expand All @@ -811,6 +819,9 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= wantedValue })
if i == len(e.offsets) {
// We're past the end.
for len(rngs) < len(values) {
rngs = append(rngs, notFoundRange)
}
break
}
if i > 0 && e.offsets[i].value != wantedValue {
Expand Down Expand Up @@ -858,6 +869,8 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
// Record on the way if wanted value is equal to the current value.
if string(value) == wantedValue {
newSameRngs = append(newSameRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
} else {
rngs = append(rngs, notFoundRange)
}
valueIndex++
if valueIndex == len(values) {
Expand All @@ -877,6 +890,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
}

if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value {
// Increment i when wanted value is same as next offset.
if wantedValue == e.offsets[i+1].value {
i++
}
// wantedValue is smaller or same as the next offset we know about, let's iterate further to add those.
continue
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/block/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ type Reader interface {
// IndexVersion returns version of index.
IndexVersion() (int, error)

// PostingsOffsets returns start and end offsets for postings for given name and values.
// Input values need to be sorted. If a posting doesn't exist, posting wiht start and end
// both set to -1 will be returned.
PostingsOffsets(name string, value ...string) ([]index.Range, error)

// PostingsOffset returns start and end offsets of postings for given name and value.
// The end offset might be bigger than the actual posting ending, but not larger than the whole index file.
// NotFoundRangeErr is returned when no index can be found for given name and value.
// TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark.
PostingsOffset(name string, value string) (index.Range, error)

// LookupSymbol returns string based on given reference.
Expand Down
32 changes: 32 additions & 0 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,38 @@ func TestReaders(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

// single value
rngs, err := br.PostingsOffsets("a", "9")
testutil.Ok(t, err)
for _, rng := range rngs {
testutil.Assert(t, rng.End > rng.Start)
}

rngs, err = br.PostingsOffsets("a", "2", "3", "4", "5", "6", "7", "8", "9")
testutil.Ok(t, err)
for _, rng := range rngs {
testutil.Assert(t, rng.End > rng.Start)
}

rngs, err = br.PostingsOffsets("a", "0")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 1)
testutil.Equals(t, notFoundRange, rngs[0])

rngs, err = br.PostingsOffsets("a", "0", "10", "99")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 3)
for _, rng := range rngs {
testutil.Equals(t, notFoundRange, rng)
}

rngs, err = br.PostingsOffsets("a", "1", "10", "9")
testutil.Ok(t, err)
testutil.Assert(t, len(rngs) == 3)
testutil.Assert(t, rngs[0].End > rngs[0].Start)
testutil.Assert(t, rngs[2].End > rngs[2].Start)
testutil.Equals(t, notFoundRange, rngs[1])

// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
// Most of not existing value was working despite bug, except in certain unlucky cases
// it was causing "invalid size" errors.
Expand Down
13 changes: 13 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) {
return r.reader.IndexVersion()
}

// PostingsOffsets implements Reader.
func (r *LazyBinaryReader) PostingsOffsets(name string, values ...string) ([]index.Range, error) {
r.readerMx.RLock()
defer r.readerMx.RUnlock()

if err := r.load(); err != nil {
return nil, err
}

r.usedAt.Store(time.Now().UnixNano())
return r.reader.PostingsOffsets(name, values...)
}

// PostingsOffset implements Reader.
func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, error) {
r.readerMx.RLock()
Expand Down
Loading

0 comments on commit 294288f

Please sign in to comment.