From fc1a6edf7acc82ec21ebc9fd641ce7afc4fed049 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 12 Dec 2023 08:21:52 -0800 Subject: [PATCH] Add streaming series limit at block series client (#6972) * add series limit that is applied when streaming using block series client Signed-off-by: Ben Ye * changelog Signed-off-by: Ben Ye * add unit tests Signed-off-by: Ben Ye * address comments Signed-off-by: Ben Ye * fix comment Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + pkg/store/bucket.go | 48 +++++++++++++------ pkg/store/bucket_test.go | 100 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10eb9b8435..2f6a09133a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram. - [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs. - [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule. +- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled. ### Changed diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index da81fab93a..3ebd6f06a4 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -952,8 +952,10 @@ type blockSeriesClient struct { indexr *bucketIndexReader chunkr *bucketChunkReader loadAggregates []storepb.Aggr - chunksLimiter ChunksLimiter - bytesLimiter BytesLimiter + + seriesLimiter SeriesLimiter + chunksLimiter ChunksLimiter + bytesLimiter BytesLimiter lazyExpandedPostingEnabled bool lazyExpandedPostingsCount prometheus.Counter @@ -986,7 +988,8 @@ func newBlockSeriesClient( logger log.Logger, b *bucketBlock, req *storepb.SeriesRequest, - limiter ChunksLimiter, + seriesLimiter SeriesLimiter, + chunksLimiter ChunksLimiter, bytesLimiter BytesLimiter, blockMatchers []*labels.Matcher, shardMatcher *storepb.ShardMatcher, @@ -1022,7 +1025,8 @@ func newBlockSeriesClient( maxt: req.MaxTime, indexr: b.indexReader(), chunkr: chunkr, - chunksLimiter: limiter, + seriesLimiter: seriesLimiter, + chunksLimiter: chunksLimiter, bytesLimiter: bytesLimiter, skipChunks: req.SkipChunks, seriesFetchDurationSum: seriesFetchDurationSum, @@ -1091,20 +1095,21 @@ func (b *blockSeriesClient) ExpandPostings( } b.lazyPostings = ps - // If lazy expanded posting enabled, it is possible to fetch more series - // so easier to hit the series limit. - if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) - } - - if b.batchSize > len(ps.postings) { - b.batchSize = len(ps.postings) - } if b.lazyPostings.lazyExpanded() { // Assume lazy expansion could cut actual expanded postings length to 50%. b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) b.lazyExpandedPostingsCount.Inc() + } else { + // Apply series limiter eargerly if lazy postings not enabled. + if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) + } } + + if b.batchSize > len(ps.postings) { + b.batchSize = len(ps.postings) + } + b.entries = make([]seriesEntry, 0, b.batchSize) return nil } @@ -1169,6 +1174,7 @@ func (b *blockSeriesClient) nextBatch(tenant string) error { return errors.Wrap(err, "preload series") } + seriesMatched := 0 b.entries = b.entries[:0] OUTER: for i := 0; i < len(postingsBatch); i++ { @@ -1209,6 +1215,7 @@ OUTER: continue } + seriesMatched++ s := seriesEntry{lset: completeLabelset} if b.skipChunks { b.entries = append(b.entries, s) @@ -1238,6 +1245,13 @@ OUTER: b.entries = append(b.entries, s) } + if b.lazyPostings.lazyExpanded() { + // Apply series limit before fetching chunks, for actual series matched. + if err := b.seriesLimiter.Reserve(uint64(seriesMatched)); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) + } + } + if !b.skipChunks { if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "load chunks") @@ -1405,8 +1419,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false ) @@ -1464,6 +1479,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.logger, blk, req, + seriesLimiter, chunksLimiter, bytesLimiter, sortedBlockMatchers, @@ -1764,6 +1780,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq s.logger, b, seriesReq, + seriesLimiter, nil, bytesLimiter, reqSeriesMatchersNoExtLabels, @@ -1967,6 +1984,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR s.logger, b, seriesReq, + seriesLimiter, nil, bytesLimiter, reqSeriesMatchersNoExtLabels, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 829a842ee5..87659f5450 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2777,6 +2777,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet nil, blk, req, + seriesLimiter, chunksLimiter, NewBytesLimiterFactory(0)(nil), matchers, @@ -3649,3 +3650,102 @@ func TestQueryStatsMerge(t *testing.T) { s.merge(o) testutil.Equals(t, e, s) } + +func TestBucketStoreStreamingSeriesLimit(t *testing.T) { + logger := log.NewNopLogger() + tmpDir := t.TempDir() + bktDir := filepath.Join(tmpDir, "bkt") + auxDir := filepath.Join(tmpDir, "aux") + metaDir := filepath.Join(tmpDir, "meta") + extLset := labels.FromStrings("region", "eu-west") + + testutil.Ok(t, os.MkdirAll(metaDir, os.ModePerm)) + testutil.Ok(t, os.MkdirAll(auxDir, os.ModePerm)) + + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bkt.Close()) }) + + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = tmpDir + headOpts.ChunkRange = 1000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, h.Close()) }) + + app := h.Appender(context.Background()) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "1"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "2"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "3"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "4"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "5"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "6"), 0, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + id := createBlockFromHead(t, auxDir, h) + + auxBlockDir := filepath.Join(auxDir, id.String()) + _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + }, nil) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) + + chunkPool, err := NewDefaultChunkBytesPool(2e5) + testutil.Ok(t, err) + + insBkt := objstore.WithNoopInstr(bkt) + baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + }) + testutil.Ok(t, err) + + // Set series limit to 2. Only pass if series limiter applies + // for lazy postings only. + bucketStore, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + "", + NewChunksLimiterFactory(10e6), + NewSeriesLimiterFactory(2), + NewBytesLimiterFactory(10e6), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 1*time.Minute, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + WithLazyExpandedPostings(true), + WithBlockEstimatedMaxSeriesFunc(func(_ metadata.Meta) uint64 { + return 1 + }), + ) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bucketStore.Close()) }) + + testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) + + req := &storepb.SeriesRequest{ + MinTime: timestamp.FromTime(minTime), + MaxTime: timestamp.FromTime(maxTime), + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + {Type: storepb.LabelMatcher_RE, Name: "z", Value: "1|2"}, + }, + } + srv := newStoreSeriesServer(context.Background()) + testutil.Ok(t, bucketStore.Series(req, srv)) + testutil.Equals(t, 2, len(srv.SeriesSet)) +}