From 15c59776e4fa73606e10b7aa41c704e062a5d23f Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sat, 30 Dec 2023 15:09:35 +0100 Subject: [PATCH] store: add chunksize tests to acceptance tests * add chunk size tests to acceptance tests * refactor acceptance tests slightly Signed-off-by: Michael Hoffmann --- pkg/store/acceptance_test.go | 105 ++++++++++++++++++++++++++++++----- pkg/store/prometheus_test.go | 79 -------------------------- pkg/store/proxy_heap.go | 14 +++++ pkg/store/tsdb_test.go | 11 ---- 4 files changed, 105 insertions(+), 104 deletions(-) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 6bfe2da4212..fe733831347 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -6,28 +6,34 @@ package store import ( "context" "fmt" + "math" "net/url" "os" "path/filepath" "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/pkg/errors" + "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/efficientgo/core/testutil" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -62,8 +68,10 @@ type seriesCallCase struct { expectErr error } +type startStoreFn func(t *testing.T, extLset labels.Labels, append func(app storage.Appender)) storepb.StoreServer + // testStoreAPIsAcceptance tests StoreAPI from closed box perspective. -func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset labels.Labels, append func(app storage.Appender)) storepb.StoreServer) { +func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) { t.Helper() now := time.Now() @@ -232,7 +240,6 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset appendFn: func(app storage.Appender) { _, err := app.Append(0, labels.FromStrings("foo", "bar", "region", "somewhere"), 0, 0) testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) }, seriesCalls: []seriesCallCase{ @@ -759,12 +766,69 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset } } +// Regression test for https://github.com/thanos-io/thanos/issues/396. +// Note: Only TSDB and Prometheus Stores do this +func testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, startStore startStoreFn) { + t.Run("should split into chunks of max size 120", func(t *testing.T) { + baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000 + offset := int64(2*math.MaxUint16 + 5) + + extLset := labels.FromStrings("region", "eu-west") + appendFn := func(app storage.Appender) { + + var ( + ref storage.SeriesRef + err error + ) + for i := int64(0); i < offset; i++ { + ref, err = app.Append(ref, labels.FromStrings("a", "b"), baseT+i, 1) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := startStore(t, extLset, appendFn) + srv := newStoreSeriesServer(ctx) + + testutil.Ok(t, client.Series(&storepb.SeriesRequest{ + MinTime: baseT, + MaxTime: baseT + offset, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, + }, + }, srv)) + + testutil.Equals(t, 1, len(srv.SeriesSet)) + + firstSeries := srv.SeriesSet[0] + + testutil.Equals(t, []labelpb.ZLabel{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + }, firstSeries.Labels) + + for i := 0; i < len(firstSeries.Chunks)-1; i++ { + chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[i].Raw.Data) + testutil.Ok(t, err) + testutil.Equals(t, 120, chunk.NumSamples()) + } + + chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[len(firstSeries.Chunks)-1].Raw.Data) + testutil.Ok(t, err) + testutil.Equals(t, 35, chunk.NumSamples()) + }) +} + func TestBucketStore_Acceptance(t *testing.T) { t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) ctx := context.Background() - for _, lazyExpandedPosting := range []bool{false, true} { - testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { + startStore := func(lazyExpandedPostings bool) func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { + return func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { tmpDir := tt.TempDir() bktDir := filepath.Join(tmpDir, "bkt") auxDir := filepath.Join(tmpDir, "aux") @@ -835,7 +899,7 @@ func TestBucketStore_Acceptance(t *testing.T) { 1*time.Minute, WithChunkPool(chunkPool), WithFilterConfig(allowAllFilterConf), - WithLazyExpandedPostings(lazyExpandedPosting), + WithLazyExpandedPostings(lazyExpandedPostings), ) testutil.Ok(tt, err) tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) }) @@ -843,6 +907,12 @@ func TestBucketStore_Acceptance(t *testing.T) { testutil.Ok(tt, bucketStore.SyncBlocks(context.Background())) return bucketStore + } + } + + for _, lazyExpandedPostings := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyExpandedPostings:%t", lazyExpandedPostings), func(t *testing.T) { + testStoreAPIsAcceptance(t, startStore(lazyExpandedPostings)) }) } } @@ -850,7 +920,7 @@ func TestBucketStore_Acceptance(t *testing.T) { func TestPrometheusStore_Acceptance(t *testing.T) { t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) - testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { + startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { p, err := e2eutil.NewPrometheus() testutil.Ok(tt, err) tt.Cleanup(func() { testutil.Ok(tt, p.Stop()) }) @@ -870,21 +940,28 @@ func TestPrometheusStore_Acceptance(t *testing.T) { func() string { return version }) testutil.Ok(tt, err) + // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. + promStore.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES} + return promStore - }) + } + + testStoreAPIsAcceptance(t, startStore) + testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore) } func TestTSDBStore_Acceptance(t *testing.T) { t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) - testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { + startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { db, err := e2eutil.NewTSDB() testutil.Ok(tt, err) tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) + appendFn(db.Appender(context.Background())) - tsdbStore := NewTSDBStore(nil, db, component.Rule, extLset) + return NewTSDBStore(nil, db, component.Rule, extLset) + } - appendFn(db.Appender(context.Background())) - return tsdbStore - }) + testStoreAPIsAcceptance(t, startStore) + testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore) } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 5b7ff187366..079d1f2f4d4 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -18,7 +18,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/efficientgo/core/testutil" @@ -27,7 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -520,80 +518,3 @@ func TestPrometheusStore_Info(t *testing.T) { testutil.Equals(t, int64(123), resp.MinTime) testutil.Equals(t, int64(456), resp.MaxTime) } - -func testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, appender storage.Appender, newStore func() storepb.StoreServer) { - baseT := timestamp.FromTime(time.Now().AddDate(0, 0, -2)) / 1000 * 1000 - - offset := int64(2*math.MaxUint16 + 5) - for i := int64(0); i < offset; i++ { - _, err := appender.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+i, 1) - testutil.Ok(t, err) - } - - testutil.Ok(t, appender.Commit()) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - client := newStore() - srv := newStoreSeriesServer(ctx) - - testutil.Ok(t, client.Series(&storepb.SeriesRequest{ - MinTime: baseT, - MaxTime: baseT + offset, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, - {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, - }, - }, srv)) - - testutil.Equals(t, 1, len(srv.SeriesSet)) - - firstSeries := srv.SeriesSet[0] - - testutil.Equals(t, []labelpb.ZLabel{ - {Name: "a", Value: "b"}, - {Name: "region", Value: "eu-west"}, - }, firstSeries.Labels) - - testutil.Equals(t, 1093, len(firstSeries.Chunks)) - - chunk, err := chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[0].Raw.Data) - testutil.Ok(t, err) - testutil.Equals(t, 120, chunk.NumSamples()) - - chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[1].Raw.Data) - testutil.Ok(t, err) - testutil.Equals(t, 120, chunk.NumSamples()) - - chunk, err = chunkenc.FromData(chunkenc.EncXOR, firstSeries.Chunks[len(firstSeries.Chunks)-1].Raw.Data) - testutil.Ok(t, err) - testutil.Equals(t, 35, chunk.NumSamples()) -} - -// Regression test for https://github.com/thanos-io/thanos/issues/396. -func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) { - defer custom.TolerantVerifyLeak(t) - - p, err := e2eutil.NewPrometheus() - testutil.Ok(t, err) - defer func() { testutil.Ok(t, p.Stop()) }() - - testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t, p.Appender(), func() storepb.StoreServer { - testutil.Ok(t, p.Start(context.Background(), log.NewNopLogger())) - - u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) - testutil.Ok(t, err) - - proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, - func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }, - nil) - testutil.Ok(t, err) - - // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. - proxy.remoteReadAcceptableResponses = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES} - - return proxy - }) -} diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 51631b388a3..2cc7cb0a038 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -20,6 +20,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -108,6 +109,19 @@ func (d *dedupResponseHeap) Next() bool { } } +func debugChunk(bs []byte) { + ch, err := chunkenc.FromData(chunkenc.EncXOR, bs) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(ch.NumSamples()) + it := ch.Iterator(nil) + for it.Next() != chunkenc.ValNone { + fmt.Println(it.At()) + } + } +} + func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb.SeriesResponse { chunkDedupMap := map[uint64]*storepb.AggrChunk{} diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 6dcc033c1c7..95830e3d43b 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -228,18 +228,7 @@ func TestTSDBStore_Series(t *testing.T) { } } -// Regression test for https://github.com/thanos-io/thanos/issues/1038. func TestTSDBStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) { - defer custom.TolerantVerifyLeak(t) - - db, err := e2eutil.NewTSDB() - defer func() { testutil.Ok(t, db.Close()) }() - testutil.Ok(t, err) - - testSeries_SplitSamplesIntoChunksWithMaxSizeOf120(t, db.Appender(context.Background()), func() storepb.StoreServer { - return NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) - - }) } type delegatorServer struct {