Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Aug 12, 2023
1 parent ae1ea68 commit bb2e7e7
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 42 deletions.
59 changes: 37 additions & 22 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,27 @@ var (
)

type bucketStoreMetrics struct {
blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
blockLoadFailures prometheus.Counter
lastLoadedBlock prometheus.Gauge
blockDrops prometheus.Counter
blockDropFailures prometheus.Counter
seriesDataTouched *prometheus.HistogramVec
seriesDataFetched *prometheus.HistogramVec
seriesDataSizeTouched *prometheus.HistogramVec
seriesDataSizeFetched *prometheus.HistogramVec
seriesBlocksQueried prometheus.Histogram
seriesGetAllDuration prometheus.Histogram
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Histogram
chunkSizeBytes prometheus.Histogram
postingsSizeBytes prometheus.Histogram
queriesDropped *prometheus.CounterVec
seriesRefetches prometheus.Counter
chunkRefetches prometheus.Counter
emptyPostingCount prometheus.Counter
blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
blockLoadFailures prometheus.Counter
lastLoadedBlock prometheus.Gauge
blockDrops prometheus.Counter
blockDropFailures prometheus.Counter
seriesDataTouched *prometheus.HistogramVec
seriesDataFetched *prometheus.HistogramVec
seriesDataSizeTouched *prometheus.HistogramVec
seriesDataSizeFetched *prometheus.HistogramVec
seriesBlocksQueried prometheus.Histogram
seriesGetAllDuration prometheus.Histogram
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Histogram
chunkSizeBytes prometheus.Histogram
postingsSizeBytes prometheus.Histogram
queriesDropped *prometheus.CounterVec
seriesRefetches prometheus.Counter
chunkRefetches prometheus.Counter
emptyPostingCount prometheus.Counter
lazyExpandedPostingsCount prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
cachedPostingsCompressionErrors *prometheus.CounterVec
Expand Down Expand Up @@ -302,6 +303,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Help: "Total number of empty postings when fetching block series.",
})

m.lazyExpandedPostingsCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_lazy_expanded_postings_total",
Help: "Total number of lazy expanded postings when fetching block series.",
})

return &m
}

Expand Down Expand Up @@ -1009,6 +1015,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
lazyExpandedPostingEnabled bool,
lazyExpandedPostingsCount prometheus.Counter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, lazyExpandedPostingEnabled)
if err != nil {
Expand All @@ -1033,6 +1040,7 @@ func (b *blockSeriesClient) ExpandPostings(
if b.lazyPostings.lazyExpanded() {
// Assume lazy expansion could cut actual expanded postings length to 50%.
b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2)
lazyExpandedPostingsCount.Inc()
}
b.entries = make([]seriesEntry, 0, b.batchSize)
return nil
Expand Down Expand Up @@ -1420,7 +1428,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
"block.resolution": blk.meta.Thanos.Downsample.Resolution,
})

if err := blockClient.ExpandPostings(sortedBlockMatchers, seriesLimiter, s.enabledLazyExpandedPostings); err != nil {
if err := blockClient.ExpandPostings(
sortedBlockMatchers,
seriesLimiter,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
); err != nil {
span.Finish()
return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID)
}
Expand Down Expand Up @@ -1684,6 +1697,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
); err != nil {
return err
}
Expand Down Expand Up @@ -1911,6 +1925,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
sortedReqSeriesMatchersNoExtLabels,
seriesLimiter,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
); err != nil {
return err
}
Expand Down Expand Up @@ -2402,7 +2417,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
// If postings still have matchers to be applied lazily, skip caching expanded postings.
// If postings still have matchers to be applied lazily, cache expanded postings after filtering series so skip here.
if !ps.lazyExpanded() {
r.storeExpandedPostingsToCache(ms, index.NewListPostings(ps.postings), len(ps.postings))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2728,6 +2728,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
wg := sync.WaitGroup{}
wg.Add(concurrency)

dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{})
for w := 0; w < concurrency; w++ {
go func() {
defer wg.Done()
Expand Down Expand Up @@ -2769,7 +2770,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
)
testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter, false))
testutil.Ok(b, blockClient.ExpandPostings(sortedMatchers, seriesLimiter, false, dummyCounter))
defer blockClient.Close()

// Ensure at least 1 series has been returned (as expected).
Expand Down
68 changes: 49 additions & 19 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,16 +1060,6 @@ func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) {
m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(m))

memcached := e2ethanos.NewMemcached(e, "1")
testutil.Ok(t, e2e.StartAndWaitReady(memcached))

indexCacheConfig := fmt.Sprintf(`type: MEMCACHED
config:
addresses: [%s]
max_async_concurrency: 10
dns_provider_update_interval: 1s
auto_discovery: false`, memcached.InternalEndpoint("memcached"))

// Create 2 store gateways, one with lazy expanded postings enabled and another one disabled.
s1 := e2ethanos.NewStoreGW(
e,
Expand All @@ -1079,7 +1069,7 @@ config:
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()),
},
"",
indexCacheConfig,
"",
[]string{"--store.enable-lazy-expanded-postings"},
)
s2 := e2ethanos.NewStoreGW(
Expand All @@ -1090,7 +1080,7 @@ config:
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()),
},
"",
indexCacheConfig,
"",
nil,
)
testutil.Ok(t, e2e.StartAndWaitReady(s1, s2))
Expand All @@ -1105,9 +1095,9 @@ config:
numSeries := 10000
ss := make([]labels.Labels, 0, 10000)
for i := 0; i < numSeries; i++ {
ss = append(ss, labels.FromStrings("a", strconv.Itoa(i)))
ss = append(ss, labels.FromStrings("a", strconv.Itoa(i), "b", "1"))
}
extLset := labels.FromStrings("replica", "1")
extLset := labels.FromStrings("ext1", "value1", "replica", "1")

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)
Expand Down Expand Up @@ -1137,27 +1127,67 @@ config:
testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

t.Run("query", func(t *testing.T) {
queryAndAssert(t, ctx, q1.Endpoint("http"), func() string { return `count({replica="1"})` },
t.Run("query with count", func(t *testing.T) {
queryAndAssert(t, ctx, q1.Endpoint("http"), func() string { return `count({b="1"})` },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
model.Vector{
{
Value: model.SampleValue(numSeries),
Metric: map[model.LabelName]model.LabelValue{},
Value: model.SampleValue(numSeries),
},
},
)

queryAndAssert(t, ctx, q2.Endpoint("http"), func() string { return `count({replica="1"})` },
queryAndAssert(t, ctx, q2.Endpoint("http"), func() string { return `count({b="1"})` },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
model.Vector{
{
Value: model.SampleValue(numSeries),
Metric: map[model.LabelName]model.LabelValue{},
Value: model.SampleValue(numSeries),
},
},
)
})

// We expect no lazy expanded postings as query `count({b="1"})` won't trigger the optimization.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total"))
testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total"))

t.Run("query specific series will trigger lazy posting", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q1.Endpoint("http"), func() string { return `{a="1", b="1"}` },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
{
"a": "1",
"b": "1",
"ext1": "value1",
"replica": "1",
},
},
)

queryAndAssertSeries(t, ctx, q2.Endpoint("http"), func() string { return `{a="1", b="1"}` },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
{
"a": "1",
"b": "1",
"ext1": "value1",
"replica": "1",
},
},
)
})

// Use greater or equal to handle flakiness.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.GreaterOrEqual(1), "thanos_bucket_store_lazy_expanded_postings_total"))
testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total"))
}

0 comments on commit bb2e7e7

Please sign in to comment.