Skip to content

Commit

Permalink
Free store-gateway query buffers
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Dec 13, 2024
1 parent c122735 commit 16ec1e6
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/mimirpb/compat_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func FromLabelAdaptersOverwriteLabels(_ *labels.ScratchBuilder, ls []LabelAdapte

// FromLabelAdaptersToLabelsWithCopy converts []LabelAdapter to labels.Labels.
// Do NOT use unsafe to convert between data types because this function may
// get in input labels whose data structure is reused.
// get input labels whose data structure is reused.
func FromLabelAdaptersToLabelsWithCopy(input []LabelAdapter) labels.Labels {
return CopyLabels(FromLabelAdaptersToLabels(input))
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"io"
"slices"
"sort"

"github.com/go-kit/log"
Expand Down Expand Up @@ -241,6 +242,7 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
if err != nil {
return translateReceivedError(err)
}
defer msg.FreeBuffer()

estimate := msg.GetStreamingChunksEstimate()
if estimate == nil {
Expand All @@ -260,15 +262,18 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error

batch := msg.GetStreamingChunks()
if batch == nil {
msg.FreeBuffer()
return fmt.Errorf("expected to receive streaming chunks, but got message of type %T", msg.Result)
}

if len(batch.Series) == 0 {
msg.FreeBuffer()
continue
}

totalSeries += len(batch.Series)
if totalSeries > s.expectedSeriesCount {
msg.FreeBuffer()
return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)
}

Expand All @@ -282,15 +287,30 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
}
totalChunks += numChunks
if err := s.queryLimiter.AddChunks(numChunks); err != nil {
msg.FreeBuffer()
return err
}
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
msg.FreeBuffer()
return err
}

s.stats.AddFetchedChunks(uint64(numChunks))
s.stats.AddFetchedChunkBytes(uint64(chunkBytes))

// Memory safe copy.
safeSeries := make([]*storepb.StreamingChunks, 0, len(batch.Series))
for _, s := range batch.Series {
safe := *s
safe.Chunks = slices.Clone(s.Chunks)
for i, c := range safe.Chunks {
safe.Chunks[i].Raw.Data = slices.Clone(c.Raw.Data)
}
safeSeries = append(safeSeries, &safe)
}
batch.Series = safeSeries
msg.FreeBuffer()

if err := s.sendBatch(batch); err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,15 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor

return err
}
defer resp.FreeBuffer()

// Response may either contain series, streaming series, warning or hints.
if s := resp.GetSeries(); s != nil {
// Take a safe copy of every label.
for i, l := range s.Labels {
s.Labels[i].Name = strings.Clone(l.Name)
s.Labels[i].Value = strings.Clone(l.Value)
}
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
Expand Down Expand Up @@ -857,7 +863,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor

for _, s := range ss.Series {
// Add series fingerprint to query limiter; will return error if we are over the limit
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
return limitErr
Expand Down
6 changes: 5 additions & 1 deletion pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -1268,6 +1269,7 @@ func fetchCachedSeriesForPostings(ctx context.Context, userID string, indexCache
logSeriesForPostingsCacheEvent(ctx, logger, userID, blockID, shard, itemID, "msg", "can't decode series cache", "err", err)
return seriesChunkRefsSet{}, false
}
defer entry.FreeBuffer()

if !bytes.Equal(itemID.encodedPostings, entry.DiffEncodedPostings) {
logSeriesForPostingsCacheEvent(ctx, logger, userID, blockID, shard, itemID, "msg", "cached series postings doesn't match, possible collision", "cached_trimmed_postings", previewDiffEncodedPostings(entry.DiffEncodedPostings))
Expand All @@ -1278,8 +1280,10 @@ func fetchCachedSeriesForPostings(ctx context.Context, userID string, indexCache
// after Next() will be called again.
res := newSeriesChunkRefsSet(len(entry.Series), true)
for _, lset := range entry.Series {
ls := mimirpb.FromLabelAdaptersToLabels(lset.Labels)
ls.InternStrings(strings.Clone)
res.series = append(res.series, seriesChunkRefs{
lset: mimirpb.FromLabelAdaptersToLabels(lset.Labels),
lset: ls,
})
}
return res, true
Expand Down

0 comments on commit 16ec1e6

Please sign in to comment.