From bb2e7e75606fb0133be946db5aa18806ae6aae5d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sat, 12 Aug 2023 10:15:05 -0700 Subject: [PATCH] add metrics Signed-off-by: Ben Ye --- pkg/store/bucket.go | 59 ++++++++++++++++++----------- pkg/store/bucket_test.go | 3 +- test/e2e/store_gateway_test.go | 68 ++++++++++++++++++++++++---------- 3 files changed, 88 insertions(+), 42 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f9e559c7471..4f817f0c059 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -117,26 +117,27 @@ var ( ) type bucketStoreMetrics struct { - blocksLoaded prometheus.Gauge - blockLoads prometheus.Counter - blockLoadFailures prometheus.Counter - lastLoadedBlock prometheus.Gauge - blockDrops prometheus.Counter - blockDropFailures prometheus.Counter - seriesDataTouched *prometheus.HistogramVec - seriesDataFetched *prometheus.HistogramVec - seriesDataSizeTouched *prometheus.HistogramVec - seriesDataSizeFetched *prometheus.HistogramVec - seriesBlocksQueried prometheus.Histogram - seriesGetAllDuration prometheus.Histogram - seriesMergeDuration prometheus.Histogram - resultSeriesCount prometheus.Histogram - chunkSizeBytes prometheus.Histogram - postingsSizeBytes prometheus.Histogram - queriesDropped *prometheus.CounterVec - seriesRefetches prometheus.Counter - chunkRefetches prometheus.Counter - emptyPostingCount prometheus.Counter + blocksLoaded prometheus.Gauge + blockLoads prometheus.Counter + blockLoadFailures prometheus.Counter + lastLoadedBlock prometheus.Gauge + blockDrops prometheus.Counter + blockDropFailures prometheus.Counter + seriesDataTouched *prometheus.HistogramVec + seriesDataFetched *prometheus.HistogramVec + seriesDataSizeTouched *prometheus.HistogramVec + seriesDataSizeFetched *prometheus.HistogramVec + seriesBlocksQueried prometheus.Histogram + seriesGetAllDuration prometheus.Histogram + seriesMergeDuration prometheus.Histogram + resultSeriesCount prometheus.Histogram + chunkSizeBytes prometheus.Histogram + postingsSizeBytes prometheus.Histogram + queriesDropped *prometheus.CounterVec + seriesRefetches prometheus.Counter + chunkRefetches prometheus.Counter + emptyPostingCount prometheus.Counter + lazyExpandedPostingsCount prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec @@ -302,6 +303,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of empty postings when fetching block series.", }) + m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_lazy_expanded_postings_total", + Help: "Total number of lazy expanded postings when fetching block series.", + }) + return &m } @@ -1009,6 +1015,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, lazyExpandedPostingEnabled bool, + lazyExpandedPostingsCount prometheus.Counter, ) error { ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, lazyExpandedPostingEnabled) if err != nil { @@ -1033,6 +1040,7 @@ func (b *blockSeriesClient) ExpandPostings( if b.lazyPostings.lazyExpanded() { // Assume lazy expansion could cut actual expanded postings length to 50%. b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) + lazyExpandedPostingsCount.Inc() } b.entries = make([]seriesEntry, 0, b.batchSize) return nil @@ -1420,7 +1428,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter, s.enabledLazyExpandedPostings); err != nil { + if err := blockClient.ExpandPostings( + sortedBlockMatchers, + seriesLimiter, + s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, + ); err != nil { span.Finish() return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) } @@ -1684,6 +1697,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq sortedReqSeriesMatchersNoExtLabels, seriesLimiter, s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, ); err != nil { return err } @@ -1911,6 +1925,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR sortedReqSeriesMatchersNoExtLabels, seriesLimiter, s.enabledLazyExpandedPostings, + s.metrics.lazyExpandedPostingsCount, ); err != nil { return err } @@ -2402,7 +2417,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } - // If postings still have matchers to be applied lazily, skip caching expanded postings. + // If postings still have matchers to be applied lazily, cache expanded postings after filtering series so skip here. if !ps.lazyExpanded() { r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings)) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 0a475c55939..6ab0154f7ce 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2728,6 +2728,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet wg := sync.WaitGroup{} wg.Add(concurrency) + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) for w := 0; w < concurrency; w++ { go func() { defer wg.Done() @@ -2769,7 +2770,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet dummyHistogram, nil, ) - testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter, false)) + testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter, false, dummyCounter)) defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 8c5d66624c5..9d0a6aa89db 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1060,16 +1060,6 @@ func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) { m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) testutil.Ok(t, e2e.StartAndWaitReady(m)) - memcached := e2ethanos.NewMemcached(e, "1") - testutil.Ok(t, e2e.StartAndWaitReady(memcached)) - - indexCacheConfig := fmt.Sprintf(`type: MEMCACHED -config: - addresses: [%s] - max_async_concurrency: 10 - dns_provider_update_interval: 1s - auto_discovery: false`, memcached.InternalEndpoint("memcached")) - // Create 2 store gateways, one with lazy expanded postings enabled and another one disabled. s1 := e2ethanos.NewStoreGW( e, @@ -1079,7 +1069,7 @@ config: Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, "", - indexCacheConfig, + "", []string{"--store.enable-lazy-expanded-postings"}, ) s2 := e2ethanos.NewStoreGW( @@ -1090,7 +1080,7 @@ config: Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, "", - indexCacheConfig, + "", nil, ) testutil.Ok(t, e2e.StartAndWaitReady(s1, s2)) @@ -1105,9 +1095,9 @@ config: numSeries := 10000 ss := make([]labels.Labels, 0, 10000) for i := 0; i < numSeries; i++ { - ss = append(ss, labels.FromStrings("a", strconv.Itoa(i))) + ss = append(ss, labels.FromStrings("a", strconv.Itoa(i), "b", "1")) } - extLset := labels.FromStrings("replica", "1") + extLset := labels.FromStrings("ext1", "value1", "replica", "1") ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) t.Cleanup(cancel) @@ -1137,27 +1127,67 @@ config: testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) - t.Run("query", func(t *testing.T) { - queryAndAssert(t, ctx, q1.Endpoint("http"), func() string { return `count({replica="1"})` }, + t.Run("query with count", func(t *testing.T) { + queryAndAssert(t, ctx, q1.Endpoint("http"), func() string { return `count({b="1"})` }, time.Now, promclient.QueryOptions{ Deduplicate: false, }, model.Vector{ { - Value: model.SampleValue(numSeries), + Metric: map[model.LabelName]model.LabelValue{}, + Value: model.SampleValue(numSeries), }, }, ) - queryAndAssert(t, ctx, q2.Endpoint("http"), func() string { return `count({replica="1"})` }, + queryAndAssert(t, ctx, q2.Endpoint("http"), func() string { return `count({b="1"})` }, time.Now, promclient.QueryOptions{ Deduplicate: false, }, model.Vector{ { - Value: model.SampleValue(numSeries), + Metric: map[model.LabelName]model.LabelValue{}, + Value: model.SampleValue(numSeries), + }, + }, + ) + }) + + // We expect no lazy expanded postings as query `count({b="1"})` won't trigger the optimization. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total")) + + t.Run("query specific series will trigger lazy posting", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q1.Endpoint("http"), func() string { return `{a="1", b="1"}` }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "1", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + queryAndAssertSeries(t, ctx, q2.Endpoint("http"), func() string { return `{a="1", b="1"}` }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "1", + "ext1": "value1", + "replica": "1", }, }, ) }) + + // Use greater or equal to handle flakiness. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.GreaterOrEqual(1), "thanos_bucket_store_lazy_expanded_postings_total")) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total")) }