Skip to content

Commit

Permalink
Store: fix block dedup (#6697)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored Sep 5, 2023
1 parent d82ccb8 commit d1edf74
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b).
- [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: fix block deduplication

### Added

Expand Down
53 changes: 39 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,6 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill
// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels)

if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
Expand Down Expand Up @@ -1376,21 +1375,47 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID)
}

part := newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
)
// If we have inner replica labels we need to resort.
s.mtx.Lock()
needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels)
s.mtx.Unlock()

var resp respSet
if needsEagerRetrival {
labelsToRemove := make(map[string]struct{})
for _, replicaLabel := range req.WithoutReplicaLabels {
labelsToRemove[replicaLabel] = struct{}{}
}
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
labelsToRemove,
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
)
}

mtx.Lock()
respSets = append(respSets, part)
respSets = append(respSets, resp)
mtx.Unlock()

return nil
Expand Down
4 changes: 0 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,8 +2017,6 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) {
replicaLabels: []string{"replica"},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "1", "ext1", "0", "z", "1"),
labels.FromStrings("a", "1", "ext1", "0", "z", "1"),
labels.FromStrings("a", "1", "ext1", "0", "z", "2"),
labels.FromStrings("a", "1", "ext1", "0", "z", "2"),
labels.FromStrings("a", "1", "ext1", "1", "z", "1"),
labels.FromStrings("a", "1", "ext1", "1", "z", "2"),
Expand Down Expand Up @@ -3344,8 +3342,6 @@ func TestBucketIndexReader_decodeCachedPostingsErrors(t *testing.T) {
}

func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) {
t.Skip("Known Issue, Added for debugging in followup PR.")

logger := log.NewNopLogger()
tmpDir := t.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
Expand Down
32 changes: 19 additions & 13 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ func newAsyncRespSet(
seriesCtx,
span,
frameTimeout,
st,
st.String(),
st.LabelSets(),
closeSeries,
cl,
shardMatcher,
Expand Down Expand Up @@ -639,12 +640,14 @@ type eagerRespSet struct {
ctx context.Context

closeSeries context.CancelFunc
st Client
frameTimeout time.Duration

shardMatcher *storepb.ShardMatcher
removeLabels map[string]struct{}
storeLabels map[string]struct{}

storeName string
storeLabels map[string]struct{}
storeLabelSets []labels.Labels

// Internal bookkeeping.
bufferedResponses []*storepb.SeriesResponse
Expand All @@ -656,7 +659,8 @@ func newEagerRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
st Client,
storeName string,
storeLabelSets []labels.Labels,
closeSeries context.CancelFunc,
cl storepb.Store_SeriesClient,
shardMatcher *storepb.ShardMatcher,
Expand All @@ -666,7 +670,6 @@ func newEagerRespSet(
) respSet {
ret := &eagerRespSet{
span: span,
st: st,
closeSeries: closeSeries,
cl: cl,
frameTimeout: frameTimeout,
Expand All @@ -675,9 +678,11 @@ func newEagerRespSet(
wg: &sync.WaitGroup{},
shardMatcher: shardMatcher,
removeLabels: removeLabels,
storeName: storeName,
storeLabelSets: storeLabelSets,
}
ret.storeLabels = make(map[string]struct{})
for _, ls := range st.LabelSets() {
for _, ls := range storeLabelSets {
for _, l := range ls {
ret.storeLabels[l.Name] = struct{}{}
}
Expand All @@ -686,7 +691,7 @@ func newEagerRespSet(
ret.wg.Add(1)

// Start a goroutine and immediately buffer everything.
go func(st Client, l *eagerRespSet) {
go func(l *eagerRespSet) {
seriesStats := &storepb.SeriesStatsCounter{}
bytesProcessed := 0

Expand Down Expand Up @@ -715,7 +720,7 @@ func newEagerRespSet(

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String())
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.span.SetTag("err", err.Error())
return false
Expand All @@ -731,9 +736,9 @@ func newEagerRespSet(
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String())
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName)
} else {
rerr = errors.Wrapf(err, "receive series from %s", st.String())
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
Expand Down Expand Up @@ -773,7 +778,7 @@ func newEagerRespSet(
sortWithoutLabels(l.bufferedResponses, l.removeLabels)
}

}(st, ret)
}(ret)

return ret
}
Expand Down Expand Up @@ -845,6 +850,7 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]
}

func (l *eagerRespSet) Close() {
l.closeSeries()
l.shardMatcher.Close()
}

Expand Down Expand Up @@ -873,11 +879,11 @@ func (l *eagerRespSet) Empty() bool {
}

func (l *eagerRespSet) StoreID() string {
return l.st.String()
return l.storeName
}

func (l *eagerRespSet) Labelset() string {
return labelpb.PromLabelSetsToString(l.st.LabelSets())
return labelpb.PromLabelSetsToString(l.storeLabelSets)
}

func (l *eagerRespSet) StoreLabels() map[string]struct{} {
Expand Down

0 comments on commit d1edf74

Please sign in to comment.