diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 81b977f5d3..ef7ae5d00a 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -305,6 +305,18 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, errors.Wrap(err, "get postings") } + result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups) + if err := ctx.Err(); err != nil { + return nil, nil, err + } + ps, err := ExpandPostingsWithContext(ctx, result) + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} + +func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings { // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply // use one incrementing index to fetch postings from returned slice. @@ -313,7 +325,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post var groupAdds, groupRemovals []index.Postings for _, g := range postingGroups { if g.lazy { - break + continue } // We cannot add empty set to groupAdds, since they are intersected. if len(g.addKeys) > 0 { @@ -333,13 +345,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post } result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) - - if err := ctx.Err(); err != nil { - return nil, nil, err - } - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, nil, errors.Wrap(err, "expand") - } - return ps, lazyMatchers, nil + return result } diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 3eac705871..cb52dac412 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -16,8 +16,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/thanos/pkg/block/indexheader" @@ -745,3 +747,94 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { }) } } + +func TestMergeFetchedPostings(t *testing.T) { + ctx := context.Background() + for _, tc := range []struct { + name string + fetchedPostings []index.Postings + postingGroups []*postingGroup + expectedSeriesRefs []storage.SeriesRef + }{ + { + name: "empty fetched postings and posting groups", + }, + { + name: "single posting group with 1 add key", + fetchedPostings: []index.Postings{index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})}, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "single posting group with multiple add keys, merge", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{6, 7, 8, 9}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar", "baz"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "multiple posting groups with add key, intersect", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 4}, + }, + { + name: "posting group with remove keys", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", removeKeys: []string{"foo"}, addAll: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{3, 5}, + }, + { + name: "multiple posting groups with add key and ignore lazy posting groups", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}, lazy: true}, + {name: "baz", addKeys: []string{"foo"}, lazy: true}, + {name: "job", addKeys: []string{"foo"}, lazy: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5}, + }, + { + name: "multiple posting groups with add key and non consecutive lazy posting groups", + fetchedPostings: []index.Postings{ + index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}), + index.NewListPostings([]storage.SeriesRef{1, 2, 4}), + }, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}, lazy: true}, + {name: "baz", addKeys: []string{"foo"}}, + {name: "job", addKeys: []string{"foo"}, lazy: true}, + }, + expectedSeriesRefs: []storage.SeriesRef{1, 2, 4}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + p := mergeFetchedPostings(ctx, tc.fetchedPostings, tc.postingGroups) + res, err := index.ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, tc.expectedSeriesRefs, res) + }) + } +}