Skip to content

Commit

Permalink
more test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jul 23, 2023
1 parent 0cb44aa commit a600b29
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 24 deletions.
65 changes: 44 additions & 21 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,20 +1097,27 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in
}()

logger := log.NewNopLogger()
ctx := context.Background()

appendTestData(t, h.Appender(context.Background()), series)
appendTestData(t, h.Appender(ctx), series)

testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "tmp"), os.ModePerm))
id := createBlockFromHead(t, filepath.Join(tmpDir, "tmp"), h)
dir := filepath.Join(tmpDir, "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))
id := createBlockFromHead(t, dir, h)
bdir := filepath.Join(dir, id.String())
meta, err := metadata.ReadFromDir(bdir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)

_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc))
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, "tmp", id.String()), metadata.NoneFunc))
testutil.Ok(t, block.Upload(ctx, logger, bkt, bdir, metadata.NoneFunc))

return id
}
Expand Down Expand Up @@ -1272,41 +1279,48 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
func TestBucketSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1)
benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1)
})
}

func TestBucketSeriesLazyExpandedPostings(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValFloat, false, true, samplesPerSeries, series, 1)
})
}

func TestBucketHistogramSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValHistogram, false, samplesPerSeries, series, 1)
benchBucketSeries(t, chunkenc.ValHistogram, false, false, samplesPerSeries, series, 1)
})
}

func TestBucketSkipChunksSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1)
benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1)
})
}

func BenchmarkBucketSeries(b *testing.B) {
tb := testutil.NewTB(b)
// 10e6 samples = ~1736 days with 15s scrape
storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValFloat, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
benchBucketSeries(t, chunkenc.ValFloat, false, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
})
}

func BenchmarkBucketSkipChunksSeries(b *testing.B) {
tb := testutil.NewTB(b)
// 10e6 samples = ~1736 days with 15s scrape
storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValFloat, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
benchBucketSeries(t, chunkenc.ValFloat, true, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
})
}

func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) {
func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, lazyExpandedPostings bool, samplesPerSeries, totalSeries int, requestedRatios ...float64) {
const numOfBlocks = 4

tmpDir := t.TempDir()
Expand All @@ -1322,12 +1336,6 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b
)

extLset := labels.Labels{{Name: "ext1", Value: "1"}}
thanosMeta := metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}

blockDir := filepath.Join(tmpDir, "tmp")

samplesPerSeriesPerBlock := samplesPerSeries / numOfBlocks
Expand Down Expand Up @@ -1355,19 +1363,33 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b
})
id := createBlockFromHead(t, blockDir, head)
testutil.Ok(t, head.Close())
blockIDDir := filepath.Join(blockDir, id.String())
meta, err := metadata.ReadFromDir(blockIDDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(logger, filepath.Join(blockIDDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
thanosMeta := metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{
SeriesMaxSize: stats.SeriesMaxSize,
ChunkMaxSize: stats.ChunkMaxSize,
},
}

// Histogram chunks are represented differently in memory and on disk. In order to
// have a precise comparison, we need to use the on-disk representation as the expected value
// instead of the in-memory one.
diskBlock, err := tsdb.OpenBlock(logger, path.Join(blockDir, id.String()), nil)
diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil)
testutil.Ok(t, err)
series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...)

meta, err := metadata.InjectThanos(logger, filepath.Join(blockDir, id.String()), thanosMeta, nil)
meta, err = metadata.InjectThanos(logger, blockIDDir, thanosMeta, nil)
testutil.Ok(t, err)

testutil.Ok(t, meta.WriteToDir(logger, filepath.Join(blockDir, id.String())))
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc))
testutil.Ok(t, meta.WriteToDir(logger, blockIDDir))
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, blockIDDir, metadata.NoneFunc))
}

ibkt := objstore.WithNoopInstr(bkt)
Expand All @@ -1393,6 +1415,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk b
0,
WithLogger(logger),
WithChunkPool(chunkPool),
WithLazyExpandedPostings(lazyExpandedPostings),
)
testutil.Ok(t, err)

Expand Down
3 changes: 2 additions & 1 deletion pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ package store

import (
"context"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"math"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"golang.org/x/exp/slices"

"github.com/thanos-io/thanos/pkg/block/indexheader"
)

var emptyLazyPostings = &lazyExpandedPostings{postings: nil, matchers: nil}
Expand Down
15 changes: 13 additions & 2 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,26 @@ func createBlockWithDelay(ctx context.Context, dir string, series []labels.Label
return ulid.ULID{}, errors.Wrap(err, "create block id")
}

m, err := metadata.ReadFromDir(path.Join(dir, blockID.String()))
bdir := path.Join(dir, blockID.String())
m, err := metadata.ReadFromDir(bdir)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "open meta file")
}

logger := log.NewNopLogger()
stats, err := block.GatherIndexHealthStats(logger, path.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "gather index health stats")
}

m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}
m.Thanos.IndexStats = metadata.IndexStats{
SeriesMaxSize: stats.SeriesMaxSize,
ChunkMaxSize: stats.ChunkMaxSize,
}

if err := m.WriteToDir(log.NewNopLogger(), path.Join(dir, blockID.String())); err != nil {
if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil {
return ulid.ULID{}, errors.Wrap(err, "write meta.json file")
}

Expand Down
105 changes: 105 additions & 0 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,3 +1039,108 @@ config:
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings"))))
})
}

func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("memcached-exp")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

const bucket = "store-gateway-lazy-expanded-postings-test"
m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(m))

memcached := e2ethanos.NewMemcached(e, "1")
testutil.Ok(t, e2e.StartAndWaitReady(memcached))

indexCacheConfig := fmt.Sprintf(`type: MEMCACHED
config:
addresses: [%s]
max_async_concurrency: 10
dns_provider_update_interval: 1s
auto_discovery: false`, memcached.InternalEndpoint("memcached"))

s1 := e2ethanos.NewStoreGW(
e,
"1",
client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()),
},
"",
indexCacheConfig,
[]string{"--store.enable-lazy-expanded-postings=true"},
)
testutil.Ok(t, e2e.StartAndWaitReady(s1))

q := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))

series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")}
extLset := labels.FromStrings("ext1", "value1", "replica", "1")

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)

now := time.Now()
id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc)
testutil.Ok(t, err)

l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l,
e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed")
testutil.Ok(t, err)

testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String()))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

t.Run("query with cache miss", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
"replica": "1",
},
},
)

testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings"))))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings"))))
})

t.Run("query with cache hit", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
"replica": "1",
},
},
)

testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings"))))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings"))))
})
}

0 comments on commit a600b29

Please sign in to comment.