From a600b29eb0cb4254ed5edb3c68f315a24ac01bde Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 23 Jul 2023 16:04:48 -0700 Subject: [PATCH] more test cases Signed-off-by: Ben Ye --- pkg/store/bucket_test.go | 65 ++++++++++++------ pkg/store/lazy_postings.go | 3 +- pkg/testutil/e2eutil/prometheus.go | 15 ++++- test/e2e/store_gateway_test.go | 105 +++++++++++++++++++++++++++++ 4 files changed, 164 insertions(+), 24 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ee66008b266..e760443f5e7 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1097,20 +1097,27 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in }() logger := log.NewNopLogger() + ctx := context.Background() - appendTestData(t, h.Appender(context.Background()), series) + appendTestData(t, h.Appender(ctx), series) - testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm)) - id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h) + dir := filepath.Join(tmpDir, "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + id := createBlockFromHead(t, dir, h) + bdir := filepath.Join(dir, id.String()) + meta, err := metadata.ReadFromDir(bdir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, + IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize}, }, nil) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc)) + testutil.Ok(t, block.Upload(ctx, logger, bkt, bdir, metadata.NoneFunc)) return id } @@ -1272,21 +1279,28 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { func TestBucketSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1) + }) +} + +func TestBucketSeriesLazyExpandedPostings(t *testing.T) { + tb := testutil.NewTB(t) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, chunkenc.ValFloat, false, true, samplesPerSeries, series, 1) }) } func TestBucketHistogramSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValHistogram, false, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValHistogram, false, false, samplesPerSeries, series, 1) }) } func TestBucketSkipChunksSeries(t *testing.T) { tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1) }) } @@ -1294,7 +1308,7 @@ func BenchmarkBucketSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } @@ -1302,11 +1316,11 @@ func BenchmarkBucketSkipChunksSeries(b *testing.B) { tb := testutil.NewTB(b) // 10e6 samples = ~1736 days with 15s scrape storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { - benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1) + benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } -func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { +func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, lazyExpandedPostings bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) { const numOfBlocks = 4 tmpDir := t.TempDir() @@ -1322,12 +1336,6 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b ) extLset := labels.Labels{{Name: "ext1", Value: "1"}} - thanosMeta := metadata.Thanos{ - Labels: extLset.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: metadata.TestSource, - } - blockDir := filepath.Join(tmpDir, "tmp") samplesPerSeriesPerBlock := samplesPerSeries / numOfBlocks @@ -1355,19 +1363,33 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b }) id := createBlockFromHead(t, blockDir, head) testutil.Ok(t, head.Close()) + blockIDDir := filepath.Join(blockDir, id.String()) + meta, err := metadata.ReadFromDir(blockIDDir) + testutil.Ok(t, err) + stats, err := block.GatherIndexHealthStats(logger, filepath.Join(blockIDDir, block.IndexFilename), meta.MinTime, meta.MaxTime) + testutil.Ok(t, err) + thanosMeta := metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + IndexStats: metadata.IndexStats{ + SeriesMaxSize: stats.SeriesMaxSize, + ChunkMaxSize: stats.ChunkMaxSize, + }, + } // Histogram chunks are represented differently in memory and on disk. In order to // have a precise comparison, we need to use the on-disk representation as the expected value // instead of the in-memory one. - diskBlock, err := tsdb.OpenBlock(logger, path.Join(blockDir, id.String()), nil) + diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil) testutil.Ok(t, err) series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...) - meta, err := metadata.InjectThanos(logger, filepath.Join(blockDir, id.String()), thanosMeta, nil) + meta, err = metadata.InjectThanos(logger, blockIDDir, thanosMeta, nil) testutil.Ok(t, err) - testutil.Ok(t, meta.WriteToDir(logger, filepath.Join(blockDir, id.String()))) - testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) + testutil.Ok(t, meta.WriteToDir(logger, blockIDDir)) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, blockIDDir, metadata.NoneFunc)) } ibkt := objstore.WithNoopInstr(bkt) @@ -1393,6 +1415,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b 0, WithLogger(logger), WithChunkPool(chunkPool), + WithLazyExpandedPostings(lazyExpandedPostings), ) testutil.Ok(t, err) diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 9fb5a175fe8..6c480154298 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -5,7 +5,6 @@ package store import ( "context" - "github.com/thanos-io/thanos/pkg/block/indexheader" "math" "github.com/pkg/errors" @@ -13,6 +12,8 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/index" "golang.org/x/exp/slices" + + "github.com/thanos-io/thanos/pkg/block/indexheader" ) var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil} diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index c75e407ebc3..31e62433326 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -441,15 +441,26 @@ func createBlockWithDelay(ctx context.Context, dir string, series []labels.Label return ulid.ULID{}, errors.Wrap(err, "create block id") } - m, err := metadata.ReadFromDir(path.Join(dir, blockID.String())) + bdir := path.Join(dir, blockID.String()) + m, err := metadata.ReadFromDir(bdir) if err != nil { return ulid.ULID{}, errors.Wrap(err, "open meta file") } + logger := log.NewNopLogger() + stats, err := block.GatherIndexHealthStats(logger, path.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime) + if err != nil { + return ulid.ULID{}, errors.Wrap(err, "gather index health stats") + } + m.ULID = id m.Compaction.Sources = []ulid.ULID{id} + m.Thanos.IndexStats = metadata.IndexStats{ + SeriesMaxSize: stats.SeriesMaxSize, + ChunkMaxSize: stats.ChunkMaxSize, + } - if err := m.WriteToDir(log.NewNopLogger(), path.Join(dir, blockID.String())); err != nil { + if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil { return ulid.ULID{}, errors.Wrap(err, "write meta.json file") } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 4a888b1ff1d..e38642ba36b 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1039,3 +1039,108 @@ config: testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) }) } + +func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("memcached-exp") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + const bucket = "store-gateway-lazy-expanded-postings-test" + m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + memcached := e2ethanos.NewMemcached(e, "1") + testutil.Ok(t, e2e.StartAndWaitReady(memcached)) + + indexCacheConfig := fmt.Sprintf(`type: MEMCACHED +config: + addresses: [%s] + max_async_concurrency: 10 + dns_provider_update_interval: 1s + auto_discovery: false`, memcached.InternalEndpoint("memcached")) + + s1 := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + indexCacheConfig, + []string{"--store.enable-lazy-expanded-postings=true"}, + ) + testutil.Ok(t, e2e.StartAndWaitReady(s1)) + + q := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")} + extLset := labels.FromStrings("ext1", "value1", "replica", "1") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + now := time.Now() + id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + t.Run("query with cache miss", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + }) + + t.Run("query with cache hit", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + }) +}