From 520d2ddb98bc6833dbbad89e0ef8651865a05816 Mon Sep 17 00:00:00 2001
From: Ben Ye <benye@amazon.com>
Date: Tue, 10 Dec 2024 15:43:02 -0800
Subject: [PATCH] store gateway: fix merge fetched postings with lazy postings
 (#7979)

Signed-off-by: Ben Ye <benye@amazon.com>
---
 pkg/store/lazy_postings.go      | 24 +++++----
 pkg/store/lazy_postings_test.go | 93 +++++++++++++++++++++++++++++++++
 2 files changed, 107 insertions(+), 10 deletions(-)

diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go
index 81b977f5d3..ef7ae5d00a 100644
--- a/pkg/store/lazy_postings.go
+++ b/pkg/store/lazy_postings.go
@@ -305,6 +305,18 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
 		return nil, nil, errors.Wrap(err, "get postings")
 	}
 
+	result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups)
+	if err := ctx.Err(); err != nil {
+		return nil, nil, err
+	}
+	ps, err := ExpandPostingsWithContext(ctx, result)
+	if err != nil {
+		return nil, nil, errors.Wrap(err, "expand")
+	}
+	return ps, lazyMatchers, nil
+}
+
+func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings {
 	// Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys
 	// again, and this is exactly the same order as before (when building the groups), so we can simply
 	// use one incrementing index to fetch postings from returned slice.
@@ -313,7 +325,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
 	var groupAdds, groupRemovals []index.Postings
 	for _, g := range postingGroups {
 		if g.lazy {
-			break
+			continue
 		}
 		// We cannot add empty set to groupAdds, since they are intersected.
 		if len(g.addKeys) > 0 {
@@ -333,13 +345,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post
 	}
 
 	result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...))
-
-	if err := ctx.Err(); err != nil {
-		return nil, nil, err
-	}
-	ps, err := ExpandPostingsWithContext(ctx, result)
-	if err != nil {
-		return nil, nil, errors.Wrap(err, "expand")
-	}
-	return ps, lazyMatchers, nil
+	return result
 }
diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go
index 3eac705871..cb52dac412 100644
--- a/pkg/store/lazy_postings_test.go
+++ b/pkg/store/lazy_postings_test.go
@@ -16,8 +16,10 @@ import (
 	"github.com/prometheus/client_golang/prometheus/promauto"
 	promtest "github.com/prometheus/client_golang/prometheus/testutil"
 	"github.com/prometheus/prometheus/model/labels"
+	"github.com/prometheus/prometheus/storage"
 	"github.com/prometheus/prometheus/tsdb"
 	"github.com/prometheus/prometheus/tsdb/index"
+	"github.com/stretchr/testify/require"
 
 	"github.com/thanos-io/objstore/providers/filesystem"
 	"github.com/thanos-io/thanos/pkg/block/indexheader"
@@ -745,3 +747,94 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
 		})
 	}
 }
+
+func TestMergeFetchedPostings(t *testing.T) {
+	ctx := context.Background()
+	for _, tc := range []struct {
+		name               string
+		fetchedPostings    []index.Postings
+		postingGroups      []*postingGroup
+		expectedSeriesRefs []storage.SeriesRef
+	}{
+		{
+			name: "empty fetched postings and posting groups",
+		},
+		{
+			name:            "single posting group with 1 add key",
+			fetchedPostings: []index.Postings{index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5})},
+			postingGroups: []*postingGroup{
+				{name: "foo", addKeys: []string{"bar"}},
+			},
+			expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5},
+		},
+		{
+			name: "single posting group with multiple add keys, merge",
+			fetchedPostings: []index.Postings{
+				index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
+				index.NewListPostings([]storage.SeriesRef{6, 7, 8, 9}),
+			},
+			postingGroups: []*postingGroup{
+				{name: "foo", addKeys: []string{"bar", "baz"}},
+			},
+			expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8, 9},
+		},
+		{
+			name: "multiple posting groups with add key, intersect",
+			fetchedPostings: []index.Postings{
+				index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
+				index.NewListPostings([]storage.SeriesRef{1, 2, 4}),
+			},
+			postingGroups: []*postingGroup{
+				{name: "foo", addKeys: []string{"bar"}},
+				{name: "bar", addKeys: []string{"foo"}},
+			},
+			expectedSeriesRefs: []storage.SeriesRef{1, 2, 4},
+		},
+		{
+			name: "posting group with remove keys",
+			fetchedPostings: []index.Postings{
+				index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
+				index.NewListPostings([]storage.SeriesRef{1, 2, 4}),
+			},
+			postingGroups: []*postingGroup{
+				{name: "foo", addKeys: []string{"bar"}},
+				{name: "bar", removeKeys: []string{"foo"}, addAll: true},
+			},
+			expectedSeriesRefs: []storage.SeriesRef{3, 5},
+		},
+		{
+			name: "multiple posting groups with add key and ignore lazy posting groups",
+			fetchedPostings: []index.Postings{
+				index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
+			},
+			postingGroups: []*postingGroup{
+				{name: "foo", addKeys: []string{"bar"}},
+				{name: "bar", addKeys: []string{"foo"}, lazy: true},
+				{name: "baz", addKeys: []string{"foo"}, lazy: true},
+				{name: "job", addKeys: []string{"foo"}, lazy: true},
+			},
+			expectedSeriesRefs: []storage.SeriesRef{1, 2, 3, 4, 5},
+		},
+		{
+			name: "multiple posting groups with add key and non consecutive lazy posting groups",
+			fetchedPostings: []index.Postings{
+				index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5}),
+				index.NewListPostings([]storage.SeriesRef{1, 2, 4}),
+			},
+			postingGroups: []*postingGroup{
+				{name: "foo", addKeys: []string{"bar"}},
+				{name: "bar", addKeys: []string{"foo"}, lazy: true},
+				{name: "baz", addKeys: []string{"foo"}},
+				{name: "job", addKeys: []string{"foo"}, lazy: true},
+			},
+			expectedSeriesRefs: []storage.SeriesRef{1, 2, 4},
+		},
+	} {
+		t.Run(tc.name, func(t *testing.T) {
+			p := mergeFetchedPostings(ctx, tc.fetchedPostings, tc.postingGroups)
+			res, err := index.ExpandPostings(p)
+			require.NoError(t, err)
+			require.Equal(t, tc.expectedSeriesRefs, res)
+		})
+	}
+}