diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a4bdaa7779..53e24cc3ebf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.index-header.lazy-loading-concurrency-queue-timeout`. When set, loads of index-headers at the store-gateway's index-header lazy load gate will not wait longer than that to execute. If a load reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #8138 * [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106 * [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082 +* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039 * [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 102448cb046..c5e1118df26 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -606,10 +606,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor defer done() var ( - // If we are streaming the series labels and chunks separately, we don't need to fetch the postings - // twice. So we use these slices to re-use them. Each reuse[i] corresponds to a single block. - reuse []*reusedPostingsAndMatchers - resHints = &hintspb.SeriesResponseHints{} + streamingIterators *streamingSeriesIterators + resHints = &hintspb.SeriesResponseHints{} ) for _, b := range blocks { resHints.AddQueriedBlock(b.meta.ULID) @@ -633,7 +631,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) ) - seriesSet, reuse, err = s.streamingSeriesForBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) + seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) if err != nil { return err } @@ -661,15 +659,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor start := time.Now() if req.StreamingChunksBatchSize > 0 { - var seriesChunkIt iterator[seriesChunksSet] - seriesChunkIt, err = s.streamingChunksSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse) - if err != nil { - return err - } + seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators) err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount) } else { var seriesSet storepb.SeriesSet - seriesSet, err = s.nonStreamingSeriesSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) + seriesSet, err = s.createIteratorForNonChunksStreamingRequest(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) if err != nil { return err } @@ -1027,8 +1021,8 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { return size } -// nonStreamingSeriesSetForBlocks is used when the streaming feature is not enabled. -func (s *BucketStore) nonStreamingSeriesSetForBlocks( +// createIteratorForNonChunksStreamingRequest is used when the streaming feature is not enabled. +func (s *BucketStore) createIteratorForNonChunksStreamingRequest( ctx context.Context, req *storepb.SeriesRequest, blocks []*bucketBlock, @@ -1036,15 +1030,15 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks( chunkReaders *bucketChunkReaders, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, - chunksLimiter ChunksLimiter, // Rate limiter for loading chunks. - seriesLimiter SeriesLimiter, // Rate limiter for loading series. + chunksLimiter ChunksLimiter, + seriesLimiter SeriesLimiter, stats *safeQueryStats, ) (storepb.SeriesSet, error) { strategy := defaultStrategy if req.SkipChunks { strategy = noChunkRefs } - it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, nil, strategy) + it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, strategy, nil) if err != nil { return nil, err } @@ -1059,56 +1053,45 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks( return set, nil } -// streamingSeriesForBlocks is used when streaming feature is enabled. +// createIteratorForChunksStreamingLabelsPhase is used when streaming feature is enabled. // It returns a series set that only contains the series labels without any chunks information. -// The returned postings (series ref) and matches should be re-used when getting chunks to save on computation. -func (s *BucketStore) streamingSeriesForBlocks( +// The streamingSeriesIterators should be re-used when getting chunks to save on computation. +func (s *BucketStore) createIteratorForChunksStreamingLabelsPhase( ctx context.Context, req *storepb.SeriesRequest, blocks []*bucketBlock, indexReaders map[ulid.ULID]*bucketIndexReader, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, - chunksLimiter ChunksLimiter, // Rate limiter for loading chunks. - seriesLimiter SeriesLimiter, // Rate limiter for loading series. + chunksLimiter ChunksLimiter, + seriesLimiter SeriesLimiter, stats *safeQueryStats, -) (storepb.SeriesSet, []*reusedPostingsAndMatchers, error) { - var ( - reuse = make([]*reusedPostingsAndMatchers, len(blocks)) - strategy = noChunkRefs | overlapMintMaxt - ) - for i := range reuse { - reuse[i] = &reusedPostingsAndMatchers{} - } - it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, strategy) +) (storepb.SeriesSet, *streamingSeriesIterators, error) { + streamingIterators := newStreamingSeriesIterators() + it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, overlapMintMaxt, streamingIterators) if err != nil { return nil, nil, err } - return newSeriesSetWithoutChunks(ctx, it, stats), reuse, nil + + return newSeriesSetWithoutChunks(ctx, it, stats), streamingIterators, nil } -// streamingChunksSetForBlocks is used when streaming feature is enabled. -// It returns an iterator to go over the chunks for the series returned in the streamingSeriesForBlocks call. -// It is recommended to pass the reusePostings and reusePendingMatches returned by the streamingSeriesForBlocks call. -func (s *BucketStore) streamingChunksSetForBlocks( +// createIteratorForChunksStreamingChunksPhase is used when streaming feature is enabled. +// It returns an iterator to go over the chunks for the series returned in the createIteratorForChunksStreamingLabelsPhase call. +// It is required to pass the iterators returned by the createIteratorForChunksStreamingLabelsPhase call for reuse. +func (s *BucketStore) createIteratorForChunksStreamingChunksPhase( ctx context.Context, - req *storepb.SeriesRequest, - blocks []*bucketBlock, - indexReaders map[ulid.ULID]*bucketIndexReader, chunkReaders *bucketChunkReaders, - shardSelector *sharding.ShardSelector, - matchers []*labels.Matcher, - chunksLimiter ChunksLimiter, // Rate limiter for loading chunks. - seriesLimiter SeriesLimiter, // Rate limiter for loading series. stats *safeQueryStats, - reuse []*reusedPostingsAndMatchers, // Should come from streamingSeriesForBlocks. -) (iterator[seriesChunksSet], error) { - it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, defaultStrategy) - if err != nil { - return nil, err - } + chunksLimiter ChunksLimiter, + seriesLimiter SeriesLimiter, + iterators *streamingSeriesIterators, +) iterator[seriesChunksSet] { + preparedIterators := iterators.prepareForChunksStreamingPhase() + it := s.getSeriesIteratorFromPerBlockIterators(preparedIterators, chunksLimiter, seriesLimiter) scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats) - return scsi, nil + + return scsi } func (s *BucketStore) getSeriesIteratorFromBlocks( @@ -1121,8 +1104,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( chunksLimiter ChunksLimiter, // Rate limiter for loading chunks. seriesLimiter SeriesLimiter, // Rate limiter for loading series. stats *safeQueryStats, - reuse []*reusedPostingsAndMatchers, // Used if not empty. If not empty, len(reuse) must be len(blocks). strategy seriesIteratorStrategy, + streamingIterators *streamingSeriesIterators, ) (iterator[seriesChunkRefsSet], error) { var ( mtx = sync.Mutex{} @@ -1131,9 +1114,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( begin = time.Now() blocksQueriedByBlockMeta = make(map[blockQueriedMeta]int) ) - for i, b := range blocks { + for _, b := range blocks { b := b - i := i // Keep track of queried blocks. indexr := indexReaders[b.meta.ULID] @@ -1144,10 +1126,6 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( if shardSelector != nil { blockSeriesHashCache = s.seriesHashCache.GetBlockCache(b.meta.ULID.String()) } - var r *reusedPostingsAndMatchers - if len(reuse) > 0 { - r = reuse[i] - } g.Go(func() error { part, err := openBlockSeriesChunkRefsSetsIterator( ctx, @@ -1162,8 +1140,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( strategy, req.MinTime, req.MaxTime, stats, - r, s.logger, + streamingIterators, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -1192,13 +1170,17 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( stats.streamingSeriesExpandPostingsDuration += time.Since(begin) }) - mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, batches...) + return s.getSeriesIteratorFromPerBlockIterators(batches, chunksLimiter, seriesLimiter), nil +} + +func (s *BucketStore) getSeriesIteratorFromPerBlockIterators(perBlockIterators []iterator[seriesChunkRefsSet], chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter) iterator[seriesChunkRefsSet] { + mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, perBlockIterators...) // Apply limits after the merging, so that if the same series is part of multiple blocks it just gets // counted once towards the limit. mergedIterator = newLimitingSeriesChunkRefsSetIterator(mergedIterator, chunksLimiter, seriesLimiter) - return mergedIterator, nil + return mergedIterator } func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) { @@ -1467,8 +1449,8 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers [] noChunkRefs, minTime, maxTime, stats, - nil, logger, + nil, ) if err != nil { return nil, errors.Wrap(err, "fetch series") diff --git a/pkg/storegateway/bucket_chunk_reader_test.go b/pkg/storegateway/bucket_chunk_reader_test.go index 03bce0fc253..ba96344881c 100644 --- a/pkg/storegateway/bucket_chunk_reader_test.go +++ b/pkg/storegateway/bucket_chunk_reader_test.go @@ -44,8 +44,8 @@ func TestBucketChunkReader_refetchChunks(t *testing.T) { block.meta.MinTime, block.meta.MaxTime, newSafeQueryStats(), - nil, log.NewNopLogger(), + nil, ) require.NoError(t, err) diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index df23681acb5..060263ea0d0 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -139,6 +139,13 @@ func (b seriesChunkRefsSet) release() { seriesChunkRefsSetPool.Put(&reuse) } +// makeUnreleasable returns a new seriesChunkRefsSet that cannot be released on a subsequent call to release. +// +// This is useful for scenarios where a set will be used multiple times and so it is not safe for consumers to release it. +func (b seriesChunkRefsSet) makeUnreleasable() seriesChunkRefsSet { + return seriesChunkRefsSet{b.series, false} +} + // seriesChunkRefs holds a series with a list of chunk references. type seriesChunkRefs struct { lset labels.Labels @@ -690,47 +697,58 @@ func openBlockSeriesChunkRefsSetsIterator( ctx context.Context, batchSize int, tenantID string, - indexr *bucketIndexReader, // Index reader for block. + indexr *bucketIndexReader, indexCache indexcache.IndexCache, blockMeta *block.Meta, - matchers []*labels.Matcher, // Series matchers. - shard *sharding.ShardSelector, // Shard selector. + matchers []*labels.Matcher, + shard *sharding.ShardSelector, seriesHasher seriesHasher, strategy seriesIteratorStrategy, - minTime, maxTime int64, // Series must have data in this time range to be returned (ignored if skipChunks=true). + minTime, maxTime int64, stats *safeQueryStats, - reuse *reusedPostingsAndMatchers, // If this is not nil, these posting and matchers are used as it is without fetching new ones. logger log.Logger, + streamingIterators *streamingSeriesIterators, ) (iterator[seriesChunkRefsSet], error) { if batchSize <= 0 { return nil, errors.New("set size must be a positive number") } - var ( - ps []storage.SeriesRef - pendingMatchers []*labels.Matcher - fetchPostings = true - ) - if reuse != nil { - fetchPostings = !reuse.isSet() - ps = reuse.ps - pendingMatchers = reuse.matchers + ps, pendingMatchers, err := indexr.ExpandedPostings(ctx, matchers, stats) + if err != nil { + return nil, errors.Wrap(err, "expanded matching postings") } - if fetchPostings { - var err error - ps, pendingMatchers, err = indexr.ExpandedPostings(ctx, matchers, stats) - if err != nil { - return nil, errors.Wrap(err, "expanded matching postings") - } - if reuse != nil { - reuse.set(ps, pendingMatchers) - } + + iteratorFactory := func(strategy seriesIteratorStrategy, psi *postingsSetsIterator) iterator[seriesChunkRefsSet] { + return openBlockSeriesChunkRefsSetsIteratorFromPostings(ctx, tenantID, indexr, indexCache, blockMeta, shard, seriesHasher, strategy, minTime, maxTime, stats, psi, pendingMatchers, logger) } + if streamingIterators == nil { + psi := newPostingsSetsIterator(ps, batchSize) + return iteratorFactory(strategy, psi), nil + } + + return streamingIterators.wrapIterator(strategy, ps, batchSize, iteratorFactory), nil +} + +func openBlockSeriesChunkRefsSetsIteratorFromPostings( + ctx context.Context, + tenantID string, + indexr *bucketIndexReader, + indexCache indexcache.IndexCache, + blockMeta *block.Meta, + shard *sharding.ShardSelector, + seriesHasher seriesHasher, + strategy seriesIteratorStrategy, + minTime, maxTime int64, + stats *safeQueryStats, + postingsSetsIterator *postingsSetsIterator, + pendingMatchers []*labels.Matcher, + logger log.Logger, +) iterator[seriesChunkRefsSet] { var it iterator[seriesChunkRefsSet] it = newLoadingSeriesChunkRefsSetIterator( ctx, - newPostingsSetsIterator(ps, batchSize), + postingsSetsIterator, indexr, indexCache, stats, @@ -743,36 +761,12 @@ func openBlockSeriesChunkRefsSetsIterator( tenantID, logger, ) + if len(pendingMatchers) > 0 { it = newFilteringSeriesChunkRefsSetIterator(pendingMatchers, it, stats) } - return it, nil -} - -// reusedPostings is used to share the postings and matches across function calls for re-use -// in case of streaming series. We have it as a separate struct so that we can give a safe way -// to use it by making a copy where required. You can use it to put items only once. -type reusedPostingsAndMatchers struct { - ps []storage.SeriesRef - matchers []*labels.Matcher - filled bool -} - -func (p *reusedPostingsAndMatchers) set(ps []storage.SeriesRef, matchers []*labels.Matcher) { - if p.filled { - // We already have something here. - return - } - // Postings list can be modified later, so we make a copy here. - p.ps = make([]storage.SeriesRef, len(ps)) - copy(p.ps, ps) - p.matchers = matchers - p.filled = true -} - -func (p *reusedPostingsAndMatchers) isSet() bool { - return p.filled + return it } // seriesIteratorStrategy defines the strategy to use when loading the series and their chunk refs. @@ -812,6 +806,14 @@ func (s seriesIteratorStrategy) isNoChunkRefsAndOverlapMintMaxt() bool { return s.isNoChunkRefs() && s.isOverlapMintMaxt() } +func (s seriesIteratorStrategy) withNoChunkRefs() seriesIteratorStrategy { + return s | noChunkRefs +} + +func (s seriesIteratorStrategy) withChunkRefs() seriesIteratorStrategy { + return s & ^noChunkRefs +} + func newLoadingSeriesChunkRefsSetIterator( ctx context.Context, postingsSetIterator *postingsSetsIterator, diff --git a/pkg/storegateway/series_refs_streaming.go b/pkg/storegateway/series_refs_streaming.go new file mode 100644 index 00000000000..b5c5d8bd3bb --- /dev/null +++ b/pkg/storegateway/series_refs_streaming.go @@ -0,0 +1,145 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package storegateway + +import ( + "sync" + + "github.com/prometheus/prometheus/storage" +) + +type seriesChunkRefsIteratorFactory func(strategy seriesIteratorStrategy, psi *postingsSetsIterator) iterator[seriesChunkRefsSet] + +// chunksStreamingCachingSeriesChunkRefsSetIterator is an iterator used while streaming chunks from store-gateways to queriers. +// +// It wraps another iterator that does the actual work. If that iterator is expected to produce only a single batch, +// this iterator caches that batch for the chunks streaming phase, to avoid repeating work done during the series label sending phase. +type chunksStreamingCachingSeriesChunkRefsSetIterator struct { + strategy seriesIteratorStrategy + iteratorFactory seriesChunkRefsIteratorFactory + postings []storage.SeriesRef + batchSize int + + it iterator[seriesChunkRefsSet] + + expectSingleBatch bool + inChunksStreamingPhaseForSingleBatch bool + haveCachedBatch bool // It's possible that we expected a single batch to be returned, but the batch was filtered out by the inner iterator. + + currentBatchIndex int // -1 after beginning chunks streaming phase, 0 when on first and only batch, 1 after first and only batch + cachedBatch seriesChunkRefsSet +} + +func newChunksStreamingCachingSeriesChunkRefsSetIterator(strategy seriesIteratorStrategy, postings []storage.SeriesRef, batchSize int, iteratorFactory seriesChunkRefsIteratorFactory) *chunksStreamingCachingSeriesChunkRefsSetIterator { + expectSingleBatch := len(postings) <= batchSize + var initialStrategy seriesIteratorStrategy + var psi *postingsSetsIterator + + if expectSingleBatch { + initialStrategy = strategy.withChunkRefs() + + // No need to make a copy: we're only going to use the postings once. + psi = newPostingsSetsIterator(postings, batchSize) + } else { + // We'll load chunk refs during the chunks streaming phase. + initialStrategy = strategy.withNoChunkRefs() + + copiedPostings := make([]storage.SeriesRef, len(postings)) + copy(copiedPostings, postings) + psi = newPostingsSetsIterator(copiedPostings, batchSize) + } + + return &chunksStreamingCachingSeriesChunkRefsSetIterator{ + strategy: strategy, + postings: postings, + batchSize: batchSize, + iteratorFactory: iteratorFactory, + it: iteratorFactory(initialStrategy, psi), + expectSingleBatch: expectSingleBatch, + } +} + +func (i *chunksStreamingCachingSeriesChunkRefsSetIterator) Next() bool { + if i.inChunksStreamingPhaseForSingleBatch && i.haveCachedBatch { + i.currentBatchIndex++ + return i.currentBatchIndex < 1 + } + + return i.it.Next() +} + +func (i *chunksStreamingCachingSeriesChunkRefsSetIterator) At() seriesChunkRefsSet { + if i.inChunksStreamingPhaseForSingleBatch { + if i.currentBatchIndex == 0 && i.haveCachedBatch { + // Called Next() once. Return the cached batch. + // If the original batch was releasable or unreleasable, retain that state here. + return i.cachedBatch + } + + // Haven't called Next() yet, or called Next() multiple times and we've advanced past the only batch. + // At() should never be called in either case, so just return nothing if it is. + return seriesChunkRefsSet{} + } + + set := i.it.At() + + if i.expectSingleBatch { + i.cachedBatch = set + i.haveCachedBatch = true + + // Don't allow releasing this batch - we'll need it again later, so releasing it is not safe. + return set.makeUnreleasable() + } + + return set +} + +func (i *chunksStreamingCachingSeriesChunkRefsSetIterator) Err() error { + return i.it.Err() +} + +func (i *chunksStreamingCachingSeriesChunkRefsSetIterator) PrepareForChunksStreamingPhase() { + if i.expectSingleBatch { + i.inChunksStreamingPhaseForSingleBatch = true + i.currentBatchIndex = -1 + } else { + // No need to make a copy of postings here like we do in newChunksStreamingCachingSeriesChunkRefsSetIterator: + // we're not expecting to use them again after this, so we don't care if they're modified. + psi := newPostingsSetsIterator(i.postings, i.batchSize) + i.it = i.iteratorFactory(i.strategy.withChunkRefs(), psi) + } +} + +// streamingSeriesIterators represents a collection of iterators that will be used to handle a +// Series() request that uses chunks streaming. +type streamingSeriesIterators struct { + iterators []*chunksStreamingCachingSeriesChunkRefsSetIterator + mtx *sync.RWMutex +} + +func newStreamingSeriesIterators() *streamingSeriesIterators { + return &streamingSeriesIterators{ + mtx: &sync.RWMutex{}, + } +} + +func (i *streamingSeriesIterators) wrapIterator(strategy seriesIteratorStrategy, ps []storage.SeriesRef, batchSize int, iteratorFactory seriesChunkRefsIteratorFactory) iterator[seriesChunkRefsSet] { + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(strategy, ps, batchSize, iteratorFactory) + + i.mtx.Lock() + i.iterators = append(i.iterators, it) + i.mtx.Unlock() + + return it +} + +func (i *streamingSeriesIterators) prepareForChunksStreamingPhase() []iterator[seriesChunkRefsSet] { + prepared := make([]iterator[seriesChunkRefsSet], 0, len(i.iterators)) + + for _, it := range i.iterators { + it.PrepareForChunksStreamingPhase() + prepared = append(prepared, it) + } + + return prepared +} diff --git a/pkg/storegateway/series_refs_streaming_test.go b/pkg/storegateway/series_refs_streaming_test.go new file mode 100644 index 00000000000..51e8f6bfe80 --- /dev/null +++ b/pkg/storegateway/series_refs_streaming_test.go @@ -0,0 +1,345 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package storegateway + +import ( + "errors" + "fmt" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func TestChunksStreamingCachingSeriesChunkRefsSetIterator_ExpectingSingleBatch_HappyPath(t *testing.T) { + postings := []storage.SeriesRef{1, 2, 3} + batchSize := 4 + factoryCalls := 0 + var factoryStrategy seriesIteratorStrategy + + firstBatchSeries := []seriesChunkRefs{ + {lset: labels.FromStrings("series", "1")}, + {lset: labels.FromStrings("series", "2")}, + {lset: labels.FromStrings("series", "3")}, + } + + iteratorFactory := func(strategy seriesIteratorStrategy, _ *postingsSetsIterator) iterator[seriesChunkRefsSet] { + factoryCalls++ + factoryStrategy = strategy + + return newSliceSeriesChunkRefsSetIterator( + nil, + seriesChunkRefsSet{ + series: firstBatchSeries, + releasable: true, + }, + ) + } + + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(defaultStrategy, postings, batchSize, iteratorFactory) + + // Inner iterator should be created with chunk refs enabled. + require.Equal(t, 1, factoryCalls) + require.Equal(t, defaultStrategy.withChunkRefs(), factoryStrategy) + require.NoError(t, it.Err()) + + // During label sending phase, the single batch should be returned and not be releasable. + batches := readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + + unreleasableBatch := seriesChunkRefsSet{ + series: firstBatchSeries, + releasable: false, + } + require.Equal(t, []seriesChunkRefsSet{unreleasableBatch}, batches) + + // Prepare for chunks streaming phase. Inner iterator should not be recreated. + it.PrepareForChunksStreamingPhase() + require.Equal(t, 1, factoryCalls) + require.NoError(t, it.Err()) + + // During chunks streaming phase, the single batch should be returned but be releasable this time. + batches = readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + + releasableBatch := seriesChunkRefsSet{ + series: firstBatchSeries, + releasable: true, + } + require.Equal(t, []seriesChunkRefsSet{releasableBatch}, batches) +} + +func TestChunksStreamingCachingSeriesChunkRefsSetIterator_ExpectingSingleBatch_InnerIteratorReturnsUnreleasableSet(t *testing.T) { + postings := []storage.SeriesRef{1, 2, 3} + batchSize := 4 + factoryCalls := 0 + + unreleasableBatch := seriesChunkRefsSet{ + series: []seriesChunkRefs{ + {lset: labels.FromStrings("series", "1")}, + {lset: labels.FromStrings("series", "2")}, + {lset: labels.FromStrings("series", "3")}, + }, + releasable: false, + } + + iteratorFactory := func(_ seriesIteratorStrategy, _ *postingsSetsIterator) iterator[seriesChunkRefsSet] { + factoryCalls++ + return newSliceSeriesChunkRefsSetIterator(nil, unreleasableBatch) + } + + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(defaultStrategy, postings, batchSize, iteratorFactory) + + // During label sending phase, the single batch should be returned and not be releasable. + batches := readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Equal(t, []seriesChunkRefsSet{unreleasableBatch}, batches) + + // Prepare for chunks streaming phase. Inner iterator should not be recreated. + it.PrepareForChunksStreamingPhase() + require.Equal(t, 1, factoryCalls) + require.NoError(t, it.Err()) + + // During chunks streaming phase, the single batch should be returned and still not be releasable. + batches = readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Equal(t, []seriesChunkRefsSet{unreleasableBatch}, batches) +} + +func TestChunksStreamingCachingSeriesChunkRefsSetIterator_ExpectingSingleBatch_AllBatchesFilteredOut(t *testing.T) { + postings := []storage.SeriesRef{1, 2, 3} + batchSize := 4 + factoryCalls := 0 + var factoryStrategy seriesIteratorStrategy + + iteratorFactory := func(strategy seriesIteratorStrategy, _ *postingsSetsIterator) iterator[seriesChunkRefsSet] { + factoryCalls++ + factoryStrategy = strategy + + return newSliceSeriesChunkRefsSetIterator( + nil, + // No batches. + ) + } + + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(defaultStrategy, postings, batchSize, iteratorFactory) + + // Inner iterator should be created with chunk refs enabled. + require.Equal(t, 1, factoryCalls) + require.Equal(t, defaultStrategy.withChunkRefs(), factoryStrategy) + require.NoError(t, it.Err()) + + // Label sending phase. + batches := readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Empty(t, batches) + + // Prepare for chunks streaming phase. Inner iterator should not be recreated. + it.PrepareForChunksStreamingPhase() + require.Equal(t, 1, factoryCalls) + require.NoError(t, it.Err()) + + // Chunks streaming phase. + batches = readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Empty(t, batches) +} + +func TestChunksStreamingCachingSeriesChunkRefsSetIterator_ExpectingSingleBatch_IteratorReturnsError(t *testing.T) { + postings := []storage.SeriesRef{1, 2, 3} + batchSize := 4 + factoryCalls := 0 + iteratorError := errors.New("something went wrong") + + iteratorFactory := func(_ seriesIteratorStrategy, _ *postingsSetsIterator) iterator[seriesChunkRefsSet] { + factoryCalls++ + + return newSliceSeriesChunkRefsSetIterator( + iteratorError, + seriesChunkRefsSet{ + series: []seriesChunkRefs{ + {lset: labels.FromStrings("series", "1")}, + {lset: labels.FromStrings("series", "2")}, + {lset: labels.FromStrings("series", "3")}, + }, + releasable: true, + }, + ) + } + + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(defaultStrategy, postings, batchSize, iteratorFactory) + + // During label sending phase, the error should be returned. + _ = readAllSeriesChunkRefsSet(it) + require.Equal(t, iteratorError, it.Err()) + + // Prepare for chunks streaming phase. Inner iterator should not be recreated. + it.PrepareForChunksStreamingPhase() + require.Equal(t, 1, factoryCalls) + + // During chunks streaming phase, the error should be returned. + _ = readAllSeriesChunkRefsSet(it) + require.Equal(t, iteratorError, it.Err()) +} + +func TestChunksStreamingCachingSeriesChunkRefsSetIterator_ExpectingMultipleBatches_HappyPath(t *testing.T) { + postings := []storage.SeriesRef{1, 2, 3, 4, 5, 6} + batchSize := 3 + factoryCalls := 0 + var factoryStrategy seriesIteratorStrategy + + firstBatchWithNoChunkRefs := seriesChunkRefsSet{ + series: []seriesChunkRefs{ + {lset: labels.FromStrings("series", "1", "have_chunk_refs", "no")}, + {lset: labels.FromStrings("series", "2", "have_chunk_refs", "no")}, + {lset: labels.FromStrings("series", "3", "have_chunk_refs", "no")}, + }, + releasable: true, + } + + secondBatchWithNoChunkRefs := seriesChunkRefsSet{ + series: []seriesChunkRefs{ + {lset: labels.FromStrings("series", "4", "have_chunk_refs", "no")}, + {lset: labels.FromStrings("series", "5", "have_chunk_refs", "no")}, + {lset: labels.FromStrings("series", "6", "have_chunk_refs", "no")}, + }, + releasable: true, + } + + firstBatchWithChunkRefs := seriesChunkRefsSet{ + series: []seriesChunkRefs{ + {lset: labels.FromStrings("series", "1", "have_chunk_refs", "yes")}, + {lset: labels.FromStrings("series", "2", "have_chunk_refs", "yes")}, + {lset: labels.FromStrings("series", "3", "have_chunk_refs", "yes")}, + }, + releasable: true, + } + + secondBatchWithChunkRefs := seriesChunkRefsSet{ + series: []seriesChunkRefs{ + {lset: labels.FromStrings("series", "4", "have_chunk_refs", "yes")}, + {lset: labels.FromStrings("series", "5", "have_chunk_refs", "yes")}, + {lset: labels.FromStrings("series", "6", "have_chunk_refs", "yes")}, + }, + releasable: true, + } + + iteratorFactory := func(strategy seriesIteratorStrategy, psi *postingsSetsIterator) iterator[seriesChunkRefsSet] { + factoryCalls++ + factoryStrategy = strategy + + require.Equal(t, 0, psi.nextBatchPostingsOffset, "postings set iterator should be at beginning when creating iterator") + + if factoryCalls == 1 { + // Simulate the underlying iterator advancing the postings set iterator to the end, to ensure we get a fresh iterator next time. + for psi.Next() { + // Nothing to do, we just want to advance. + } + + return newSliceSeriesChunkRefsSetIterator(nil, firstBatchWithNoChunkRefs, secondBatchWithNoChunkRefs) + } + + return newSliceSeriesChunkRefsSetIterator(nil, firstBatchWithChunkRefs, secondBatchWithChunkRefs) + } + + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(defaultStrategy, postings, batchSize, iteratorFactory) + + // Inner iterator should be created with chunk refs disabled. + require.Equal(t, 1, factoryCalls) + require.Equal(t, defaultStrategy.withNoChunkRefs(), factoryStrategy) + require.NoError(t, it.Err()) + + // During label sending phase, the batches should be returned as-is. + batches := readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Equal(t, []seriesChunkRefsSet{firstBatchWithNoChunkRefs, secondBatchWithNoChunkRefs}, batches) + + // Prepare for chunks streaming phase. Inner iterator should be recreated with chunk refs enabled. + it.PrepareForChunksStreamingPhase() + require.Equal(t, 2, factoryCalls) + require.Equal(t, defaultStrategy.withChunkRefs(), factoryStrategy) + require.NoError(t, it.Err()) + + // During chunks streaming phase, the batches should be returned as-is from the new iterator. + batches = readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Equal(t, []seriesChunkRefsSet{firstBatchWithChunkRefs, secondBatchWithChunkRefs}, batches) +} + +func TestChunksStreamingCachingSeriesChunkRefsSetIterator_ExpectingMultipleBatches_AllBatchesFilteredOut(t *testing.T) { + postings := []storage.SeriesRef{1, 2, 3, 4, 5, 6} + batchSize := 3 + factoryCalls := 0 + var factoryStrategy seriesIteratorStrategy + + iteratorFactory := func(strategy seriesIteratorStrategy, _ *postingsSetsIterator) iterator[seriesChunkRefsSet] { + factoryCalls++ + factoryStrategy = strategy + + return newSliceSeriesChunkRefsSetIterator( + nil, + // No batches. + ) + } + + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(defaultStrategy, postings, batchSize, iteratorFactory) + + // Inner iterator should be created with chunk refs disabled. + require.Equal(t, 1, factoryCalls) + require.Equal(t, defaultStrategy.withNoChunkRefs(), factoryStrategy) + require.NoError(t, it.Err()) + + // Label sending phase. + batches := readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Empty(t, batches) + + // Prepare for chunks streaming phase. Inner iterator should be recreated with chunk refs enabled. + it.PrepareForChunksStreamingPhase() + require.Equal(t, 2, factoryCalls) + require.Equal(t, defaultStrategy.withChunkRefs(), factoryStrategy) + require.NoError(t, it.Err()) + + // Chunks streaming phase. + batches = readAllSeriesChunkRefsSet(it) + require.NoError(t, it.Err()) + require.Empty(t, batches) +} + +func TestChunksStreamingCachingSeriesChunkRefsSetIterator_ExpectingMultipleBatches_IteratorReturnsError(t *testing.T) { + postings := []storage.SeriesRef{1, 2, 3, 4, 5, 6} + batchSize := 3 + factoryCalls := 0 + + iteratorFactory := func(_ seriesIteratorStrategy, _ *postingsSetsIterator) iterator[seriesChunkRefsSet] { + factoryCalls++ + + return newSliceSeriesChunkRefsSetIterator( + fmt.Errorf("error #%v", factoryCalls), + seriesChunkRefsSet{ + series: []seriesChunkRefs{ + {lset: labels.FromStrings("series", "1")}, + {lset: labels.FromStrings("series", "2")}, + {lset: labels.FromStrings("series", "3")}, + }, + releasable: true, + }, + ) + } + + it := newChunksStreamingCachingSeriesChunkRefsSetIterator(defaultStrategy, postings, batchSize, iteratorFactory) + require.Equal(t, 1, factoryCalls) + + // During label sending phase, the error from the original iterator should be returned. + _ = readAllSeriesChunkRefsSet(it) + require.EqualError(t, it.Err(), "error #1") + + // Prepare for chunks streaming phase. Inner iterator should be recreated. + it.PrepareForChunksStreamingPhase() + require.Equal(t, 2, factoryCalls) + + // During chunks streaming phase, the error from the new iterator should be returned. + _ = readAllSeriesChunkRefsSet(it) + require.EqualError(t, it.Err(), "error #2") +} diff --git a/pkg/storegateway/series_refs_test.go b/pkg/storegateway/series_refs_test.go index ee5070e6cfd..dad02bd2a2c 100644 --- a/pkg/storegateway/series_refs_test.go +++ b/pkg/storegateway/series_refs_test.go @@ -1256,7 +1256,7 @@ func TestLoadingSeriesChunkRefsSetIterator(t *testing.T) { return []seriesChunkRefsSet{set} }(), }, - "works with many series in many batches batch": { + "works with many series in many batches": { blockFactory: largerTestBlockFactory, minT: 0, maxT: math.MaxInt64, @@ -1466,7 +1466,7 @@ func assertSeriesChunkRefsSetsEqual(t testing.TB, blockID ulid.ULID, blockDir st assert.True(t, uint64(prevChunkRef)+prevChunkLen <= uint64(promChunk.Ref), "estimated length shouldn't extend into the next chunk [%d, %d, %d]", i, j, k) assert.True(t, actualChunk.length <= uint32(tsdb.EstimatedMaxChunkSize), - "chunks can be larger than 16KB, but the estimted length should be capped to 16KB to limit the impact of bugs in estimations [%d, %d, %d]", i, j, k) + "chunks can be larger than 16KB, but the estimated length should be capped to 16KB to limit the impact of bugs in estimations [%d, %d, %d]", i, j, k) prevChunkRef, prevChunkLen = promChunk.Ref, uint64(actualChunk.length) } @@ -1703,8 +1703,8 @@ func TestOpenBlockSeriesChunkRefsSetsIterator(t *testing.T) { minT, maxT, newSafeQueryStats(), - nil, log.NewNopLogger(), + nil, ) require.NoError(t, err) @@ -1804,8 +1804,8 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_pendingMatchers(t *testing.T) { block.meta.MinTime, block.meta.MaxTime, newSafeQueryStats(), - nil, log.NewNopLogger(), + nil, ) require.NoError(t, err) allSets := readAllSeriesChunkRefsSet(iterator) @@ -1868,8 +1868,8 @@ func BenchmarkOpenBlockSeriesChunkRefsSetsIterator(b *testing.B) { block.meta.MinTime, block.meta.MaxTime, newSafeQueryStats(), - nil, log.NewNopLogger(), + nil, ) require.NoError(b, err) @@ -2197,8 +2197,8 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) { b.meta.MinTime, b.meta.MaxTime, statsColdCache, - nil, log.NewNopLogger(), + nil, ) require.NoError(t, err) @@ -2213,7 +2213,7 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) { cached: testCase.cachedSeriesHashesWithWarmCache, } - statsWarnCache := newSafeQueryStats() + statsWarmCache := newSafeQueryStats() ss, err = openBlockSeriesChunkRefsSetsIterator( context.Background(), batchSize, @@ -2227,15 +2227,15 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) { noChunkRefs, b.meta.MinTime, b.meta.MaxTime, - statsWarnCache, - nil, + statsWarmCache, log.NewNopLogger(), + nil, ) require.NoError(t, err) lset = extractLabelsFromSeriesChunkRefsSets(readAllSeriesChunkRefsSet(ss)) require.NoError(t, ss.Err()) assert.Equal(t, testCase.expectedLabelSets, lset) - assert.Equal(t, testCase.expectedSeriesReadFromBlockWithWarmCache, statsWarnCache.export().seriesFetched) + assert.Equal(t, testCase.expectedSeriesReadFromBlockWithWarmCache, statsWarmCache.export().seriesFetched) }) } }) @@ -2291,7 +2291,7 @@ func TestPostingsSetsIterator(t *testing.T) { "empty postings": { postings: []storage.SeriesRef{}, batchSize: 2, - expectedBatches: [][]storage.SeriesRef{}, + expectedBatches: nil, }, } @@ -2305,51 +2305,11 @@ func TestPostingsSetsIterator(t *testing.T) { actualBatches = append(actualBatches, iterator.At()) } - assert.ElementsMatch(t, testCase.expectedBatches, actualBatches) + require.Equal(t, testCase.expectedBatches, actualBatches) }) } } -func TestReusedPostingsAndMatchers(t *testing.T) { - postingsList := [][]storage.SeriesRef{ - nil, - {}, - {1, 2, 3}, - } - matchersList := [][]*labels.Matcher{ - nil, - {}, - {labels.MustNewMatcher(labels.MatchEqual, "a", "b")}, - } - - for _, firstPostings := range postingsList { - for _, firstMatchers := range matchersList { - for _, secondPostings := range postingsList { - for _, secondMatchers := range matchersList { - r := reusedPostingsAndMatchers{} - require.False(t, r.isSet()) - - verify := func() { - r.set(firstPostings, firstMatchers) - require.True(t, r.isSet()) - if firstPostings == nil { - require.Equal(t, []storage.SeriesRef{}, r.ps) - } else { - require.Equal(t, firstPostings, r.ps) - } - require.Equal(t, firstMatchers, r.matchers) - } - verify() - - // This should not overwrite the first set. - r.set(secondPostings, secondMatchers) - verify() - } - } - } - } -} - type mockSeriesHasher struct { cached map[storage.SeriesRef]uint64 hashes map[string]uint64 @@ -2680,3 +2640,10 @@ type mockIndexCacheEntry struct { func (c mockIndexCache) FetchSeriesForPostings(context.Context, string, ulid.ULID, *sharding.ShardSelector, indexcache.PostingsKey) ([]byte, bool) { return c.fetchSeriesForPostingsResponse.contents, c.fetchSeriesForPostingsResponse.cached } + +func TestSeriesIteratorStrategy(t *testing.T) { + require.False(t, defaultStrategy.isNoChunkRefs()) + require.True(t, defaultStrategy.withNoChunkRefs().isNoChunkRefs()) + require.False(t, defaultStrategy.withNoChunkRefs().withChunkRefs().isNoChunkRefs()) + require.False(t, defaultStrategy.withChunkRefs().isNoChunkRefs()) +}