Skip to content

Commit

Permalink
Add absolute total download time metrics for series and chunks (#6726)
Browse files Browse the repository at this point in the history
* add metrics for absolute latency of loading series and chunks per block

Signed-off-by: Ben Ye <[email protected]>

* fix lint

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Sep 19, 2023
1 parent 18f8035 commit 2bc12a5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 35 deletions.
108 changes: 75 additions & 33 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,15 @@ type bucketStoreMetrics struct {
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter

seriesFetchDuration prometheus.Histogram
postingsFetchDuration prometheus.Histogram
chunkFetchDuration prometheus.Histogram
seriesFetchDuration prometheus.Histogram
// Counts time for fetching series across all batches.
seriesFetchDurationSum prometheus.Histogram
postingsFetchDuration prometheus.Histogram
// chunkFetchDuration counts total time loading chunks, but since we spawn
// multiple goroutines the actual latency is usually much lower than it.
chunkFetchDuration prometheus.Histogram
// Actual absolute total time for loading chunks.
chunkFetchDurationSum prometheus.Histogram
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -288,6 +294,12 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.seriesFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_series_fetch_duration_sum_seconds",
Help: "The total time it takes to fetch series to respond to a request sent to a store gateway across all series batches. It includes both the time to fetch it from the cache and from storage in case of cache misses.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_postings_fetch_duration_seconds",
Help: "The time it takes to fetch postings to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.",
Expand All @@ -296,7 +308,13 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {

m.chunkFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_chunks_fetch_duration_seconds",
Help: "The total time spent fetching chunks within a single request a store gateway.",
Help: "The total time spent fetching chunks within a single request for one block.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.chunkFetchDurationSum = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_chunks_fetch_duration_sum_seconds",
Help: "The total absolute time spent fetching chunks within a single request for one block.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

Expand Down Expand Up @@ -926,11 +944,13 @@ type blockSeriesClient struct {
lazyExpandedPostingSizeBytes prometheus.Counter
lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter

skipChunks bool
shardMatcher *storepb.ShardMatcher
blockMatchers []*labels.Matcher
calculateChunkHash bool
chunkFetchDuration prometheus.Histogram
skipChunks bool
shardMatcher *storepb.ShardMatcher
blockMatchers []*labels.Matcher
calculateChunkHash bool
seriesFetchDurationSum prometheus.Histogram
chunkFetchDuration prometheus.Histogram
chunkFetchDurationSum prometheus.Histogram

// Internal state.
i uint64
Expand All @@ -955,7 +975,9 @@ func newBlockSeriesClient(
shardMatcher *storepb.ShardMatcher,
calculateChunkHash bool,
batchSize int,
seriesFetchDurationSum prometheus.Histogram,
chunkFetchDuration prometheus.Histogram,
chunkFetchDurationSum prometheus.Histogram,
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
lazyExpandedPostingsCount prometheus.Counter,
Expand All @@ -978,14 +1000,16 @@ func newBlockSeriesClient(
extLset: extLset,
extLsetToRemove: extLsetToRemove,

mint: req.MinTime,
maxt: req.MaxTime,
indexr: b.indexReader(),
chunkr: chunkr,
chunksLimiter: limiter,
bytesLimiter: bytesLimiter,
skipChunks: req.SkipChunks,
chunkFetchDuration: chunkFetchDuration,
mint: req.MinTime,
maxt: req.MaxTime,
indexr: b.indexReader(),
chunkr: chunkr,
chunksLimiter: limiter,
bytesLimiter: bytesLimiter,
skipChunks: req.SkipChunks,
seriesFetchDurationSum: seriesFetchDurationSum,
chunkFetchDuration: chunkFetchDuration,
chunkFetchDurationSum: chunkFetchDurationSum,

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
Expand Down Expand Up @@ -1074,8 +1098,10 @@ func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) {
}

if len(b.entries) == 0 {
b.seriesFetchDurationSum.Observe(b.indexr.stats.SeriesDownloadLatencySum.Seconds())
if b.chunkr != nil {
b.chunkFetchDuration.Observe(b.chunkr.stats.ChunksFetchDurationSum.Seconds())
b.chunkFetchDurationSum.Observe(b.chunkr.stats.ChunksDownloadLatencySum.Seconds())
}
return nil, io.EOF
}
Expand Down Expand Up @@ -1426,7 +1452,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
shardMatcher,
s.enableChunkHashCalculation,
s.seriesBatchSize,
s.metrics.seriesFetchDurationSum,
s.metrics.chunkFetchDuration,
s.metrics.chunkFetchDurationSum,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
Expand Down Expand Up @@ -1707,7 +1735,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
true,
SeriesBatchSize,
s.metrics.chunkFetchDuration,
s.metrics.seriesFetchDurationSum,
nil,
nil,
nil,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
Expand Down Expand Up @@ -1908,7 +1938,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
true,
SeriesBatchSize,
s.metrics.chunkFetchDuration,
s.metrics.seriesFetchDurationSum,
nil,
nil,
nil,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
Expand Down Expand Up @@ -3012,7 +3044,10 @@ func (it *bigEndianPostings) length() int {

func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()
defer func() {
d := timer.ObserveDuration()
r.stats.SeriesDownloadLatencySum += d
}()

// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
Expand Down Expand Up @@ -3330,7 +3365,10 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
r.loadingChunks = true
r.loadingChunksMtx.Unlock()

begin := time.Now()
defer func() {
r.stats.ChunksDownloadLatencySum += time.Since(begin)

r.loadingChunksMtx.Lock()
r.loadingChunks = false
r.loadingChunksMtx.Unlock()
Expand Down Expand Up @@ -3559,19 +3597,21 @@ type queryStats struct {
cachedPostingsDecompressionErrors int
CachedPostingsDecompressionTimeSum time.Duration

seriesTouched int
SeriesTouchedSizeSum units.Base2Bytes
seriesFetched int
SeriesFetchedSizeSum units.Base2Bytes
seriesFetchCount int
SeriesFetchDurationSum time.Duration

chunksTouched int
ChunksTouchedSizeSum units.Base2Bytes
chunksFetched int
ChunksFetchedSizeSum units.Base2Bytes
chunksFetchCount int
ChunksFetchDurationSum time.Duration
seriesTouched int
SeriesTouchedSizeSum units.Base2Bytes
seriesFetched int
SeriesFetchedSizeSum units.Base2Bytes
seriesFetchCount int
SeriesFetchDurationSum time.Duration
SeriesDownloadLatencySum time.Duration

chunksTouched int
ChunksTouchedSizeSum units.Base2Bytes
chunksFetched int
ChunksFetchedSizeSum units.Base2Bytes
chunksFetchCount int
ChunksFetchDurationSum time.Duration
ChunksDownloadLatencySum time.Duration

GetAllDuration time.Duration
mergedSeriesCount int
Expand Down Expand Up @@ -3607,13 +3647,15 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.SeriesFetchedSizeSum += o.SeriesFetchedSizeSum
s.seriesFetchCount += o.seriesFetchCount
s.SeriesFetchDurationSum += o.SeriesFetchDurationSum
s.SeriesDownloadLatencySum += o.SeriesDownloadLatencySum

s.chunksTouched += o.chunksTouched
s.ChunksTouchedSizeSum += o.ChunksTouchedSizeSum
s.chunksFetched += o.chunksFetched
s.ChunksFetchedSizeSum += o.ChunksFetchedSizeSum
s.chunksFetchCount += o.chunksFetchCount
s.ChunksFetchDurationSum += o.ChunksFetchDurationSum
s.ChunksDownloadLatencySum += s.ChunksDownloadLatencySum

s.GetAllDuration += o.GetAllDuration
s.mergedSeriesCount += o.mergedSeriesCount
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/cespare/xxhash"
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
Expand All @@ -47,8 +48,6 @@ import (
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -2767,6 +2766,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
false,
SeriesBatchSize,
dummyHistogram,
dummyHistogram,
dummyHistogram,
nil,
false,
dummyCounter,
Expand Down

0 comments on commit 2bc12a5

Please sign in to comment.