From f3eb4238704e1c6b61f9ec0cab34e34761daa5fb Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 23 Jun 2023 01:27:03 -0700 Subject: [PATCH] fix postings offset reader and npe Signed-off-by: Ben Ye --- pkg/block/indexheader/binary_reader.go | 4 ++++ pkg/store/bucket.go | 8 ++++---- pkg/store/lazy_postings.go | 5 +++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index a30952b0a85..a2e389442d2 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -889,6 +889,10 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra } if valueIndex != len(values) && wantedValue <= e.offsets[i+1].value { + // Increment i when wanted value is same as next offset. + if wantedValue == e.offsets[i+1].value { + i++ + } // wantedValue is smaller or same as the next offset we know about, let's iterate further to add those. continue } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 34bbdf12ad0..8a3f191ef9d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -987,6 +987,7 @@ func (b *blockSeriesClient) ExpandPostings( return errors.Wrap(err, "expanded matching posting") } + b.lazyPostings = ps if len(ps.postings) == 0 { return nil } @@ -995,7 +996,6 @@ func (b *blockSeriesClient) ExpandPostings( return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) } - b.lazyPostings = ps if b.batchSize > len(ps.postings) { b.batchSize = len(ps.postings) } @@ -1044,7 +1044,7 @@ func (b *blockSeriesClient) nextBatch() error { if b.lazyPostings.lazyExpanded() { v, err := b.indexr.IndexVersion() if err != nil { - errors.Wrap(err, "get index version") + return errors.Wrap(err, "get index version") } if v >= 2 { for i := range b.expandedPostings { @@ -2264,7 +2264,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M // Shortcut the case of `len(postingGroups) == 0`. It will only happen when no // matchers specified, and we don't need to fetch expanded postings from cache. if len(ms) == 0 { - return nil, nil + return emptyLazyPostings, nil } hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) @@ -2297,7 +2297,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M // E.g. label="non-existing-value" returns empty group. if !pg.addAll && len(pg.addKeys) == 0 { r.storeExpandedPostingsToCache(ms, index.EmptyPostings(), 0) - return nil, nil + return emptyLazyPostings, nil } allRequested = allRequested || pg.addAll diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 89a8814bd0e..9053dfd7c17 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package store import ( @@ -10,6 +13,8 @@ import ( "github.com/prometheus/prometheus/tsdb/index" ) +var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil} + type lazyExpandedPostings struct { postings []storage.SeriesRef matchers []*labels.Matcher