Skip to content

Commit

Permalink
store gateway: fix merge fetched postings with lazy postings
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 10, 2024
1 parent b3645c8 commit 5cc0e97
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 10 deletions.
24 changes: 14 additions & 10 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,18 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
return nil, nil, errors.Wrap(err, "get postings")
}

result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups)
if err := ctx.Err(); err != nil {
return nil, nil, err
}
ps, err := ExpandPostingsWithContext(ctx, result)
if err != nil {
return nil, nil, errors.Wrap(err, "expand")
}
return ps, lazyMatchers, nil
}

func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings {
// Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys
// again, and this is exactly the same order as before (when building the groups), so we can simply
// use one incrementing index to fetch postings from returned slice.
Expand All @@ -313,7 +325,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
var groupAdds, groupRemovals []index.Postings
for _, g := range postingGroups {
if g.lazy {
break
continue
}
// We cannot add empty set to groupAdds, since they are intersected.
if len(g.addKeys) > 0 {
Expand All @@ -333,13 +345,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
}

result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...))

if err := ctx.Err(); err != nil {
return nil, nil, err
}
ps, err := ExpandPostingsWithContext(ctx, result)
if err != nil {
return nil, nil, errors.Wrap(err, "expand")
}
return ps, lazyMatchers, nil
return result
}
93 changes: 93 additions & 0 deletions pkg/store/lazy_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"

"github.com/thanos-io/objstore/providers/filesystem"
"github.com/thanos-io/thanos/pkg/block/indexheader"
Expand Down Expand Up @@ -745,3 +747,94 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
})
}
}

func TestMergeFetchedPostings(t *testing.T) {
ctx := context.Background()
for _, tc := range []struct {
name string
fetchedPostings []index.Postings
postingGroups []*postingGroup
expectedSeriesRefs []storage.SeriesRef
}{
{
name: "empty fetched postings and posting groups",
},
{
name: "single posting group with 1 add key",
fetchedPostings: []index.Postings{index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})},
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
},
expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5},
},
{
name: "single posting group with multiple add keys, merge",
fetchedPostings: []index.Postings{
index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
index.NewListPostings([]storage.SeriesRef{6, 7, 8, 9}),
},
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar", "baz"}},
},
expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9},
},
{
name: "multiple posting groups with add key, intersect",
fetchedPostings: []index.Postings{
index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
index.NewListPostings([]storage.SeriesRef{1, 2, 4}),
},
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", addKeys: []string{"foo"}},
},
expectedSeriesRefs: []storage.SeriesRef{1, 2, 4},
},
{
name: "posting group with remove keys",
fetchedPostings: []index.Postings{
index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
index.NewListPostings([]storage.SeriesRef{1, 2, 4}),
},
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", removeKeys: []string{"foo"}, addAll: true},
},
expectedSeriesRefs: []storage.SeriesRef{3, 5},
},
{
name: "multiple posting groups with add key and ignore lazy posting groups",
fetchedPostings: []index.Postings{
index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
},
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", addKeys: []string{"foo"}, lazy: true},
{name: "baz", addKeys: []string{"foo"}, lazy: true},
{name: "job", addKeys: []string{"foo"}, lazy: true},
},
expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5},
},
{
name: "multiple posting groups with add key and non consecutive lazy posting groups",
fetchedPostings: []index.Postings{
index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
index.NewListPostings([]storage.SeriesRef{1, 2, 4}),
},
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", addKeys: []string{"foo"}, lazy: true},
{name: "baz", addKeys: []string{"foo"}},
{name: "job", addKeys: []string{"foo"}, lazy: true},
},
expectedSeriesRefs: []storage.SeriesRef{1, 2, 4},
},
} {
t.Run(tc.name, func(t *testing.T) {
p := mergeFetchedPostings(ctx, tc.fetchedPostings, tc.postingGroups)
res, err := index.ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, tc.expectedSeriesRefs, res)
})
}
}

0 comments on commit 5cc0e97

Please sign in to comment.