Skip to content

Commit

Permalink
Fix buffer freeing in blocksStoreQuerier
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Dec 26, 2024
1 parent 9295ca5 commit 52b3560
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 82 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
golang.org/x/net v0.32.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.8.0
google.golang.org/grpc v1.69.0
google.golang.org/grpc v1.69.2
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1518,8 +1518,8 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI=
google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
if err != nil {
return translateReceivedError(err)
}
defer msg.FreeBuffer()

estimate := msg.GetStreamingChunksEstimate()
msg.FreeBuffer()
if estimate == nil {
return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result)
}
Expand Down
177 changes: 101 additions & 76 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,93 +793,32 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return gCtx.Err()
}

resp, err := stream.Recv()
var err error
var isEOS bool
var shouldRetry bool
mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, isEOS, shouldRetry, err = q.receiveMessage(
c, stream, queryLimiter, mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched,
)
if errors.Is(err, io.EOF) {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
break
}
if err != nil {
if shouldRetry(err) {
level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err)
return nil
}

return err
}
defer resp.FreeBuffer()

// Response may either contain series, streaming series, warning or hints.
if s := resp.GetSeries(); s != nil {
// Take a safe copy of every label.
for i, l := range s.Labels {
s.Labels[i].Name = strings.Clone(l.Name)
s.Labels[i].Value = strings.Clone(l.Value)
}
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
return err
}

chunksCount, chunksSize := countChunksAndBytes(s)
q.metrics.chunksTotal.Add(float64(chunksCount))
if err := queryLimiter.AddChunkBytes(chunksSize); err != nil {
return err
}
if err := queryLimiter.AddChunks(chunksCount); err != nil {
return err
}
if err := queryLimiter.AddEstimatedChunks(chunksCount); err != nil {
return err
}
}

if w := resp.GetWarning(); w != "" {
myWarnings.Add(errors.New(w))
}

if h := resp.GetHints(); h != nil {
hints := hintspb.SeriesResponseHints{}
if err := types.UnmarshalAny(h, &hints); err != nil {
return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress())
}

ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks)
if err != nil {
return errors.Wrapf(err, "failed to parse queried block IDs from received hints")
}

myQueriedBlocks = append(myQueriedBlocks, ids...)
}

if s := resp.GetStats(); s != nil {
indexBytesFetched += s.FetchedIndexBytes
if shouldRetry {
level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err)
return nil
}

if ss := resp.GetStreamingSeries(); ss != nil {
myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))

for _, s := range ss.Series {
// Add series fingerprint to query limiter; will return error if we are over the limit
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
return limitErr
}

myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
if isEOS {
// If we aren't expecting any series from this stream, close it now.
if len(myStreamingSeriesLabels) == 0 {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
}

if ss.IsEndOfSeriesStream {
// If we aren't expecting any series from this stream, close it now.
if len(myStreamingSeriesLabels) == 0 {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
}

// We expect "end of stream" to be sent after the hints and the stats have been sent, so we can break out of the loop now.
break
}
// We expect "end of stream" to be sent after the hints and the stats have been sent, so we can break out of the loop now.
break
}
}

Expand Down Expand Up @@ -987,6 +926,92 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil //nolint:govet // It's OK to return without cancelling reqCtx, see comment above.
}

func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegatewaypb.StoreGateway_SeriesClient, queryLimiter *limiter.QueryLimiter, mySeries []*storepb.Series, myWarnings annotations.Annotations, myQueriedBlocks []ulid.ULID, myStreamingSeriesLabels []labels.Labels, indexBytesFetched uint64) ([]*storepb.Series, annotations.Annotations, []ulid.ULID, []labels.Labels, uint64, bool, bool, error) {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}

if shouldRetry(err) {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, true, nil
}

return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
defer resp.FreeBuffer()

// Response may either contain series, streaming series, warning or hints.
if s := resp.GetSeries(); s != nil {
// Take a safe copy of every label.
for i, l := range s.Labels {
s.Labels[i].Name = strings.Clone(l.Name)
s.Labels[i].Value = strings.Clone(l.Value)
}
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}

chunksCount, chunksSize := countChunksAndBytes(s)
q.metrics.chunksTotal.Add(float64(chunksCount))
if err := queryLimiter.AddChunkBytes(chunksSize); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
if err := queryLimiter.AddChunks(chunksCount); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
if err := queryLimiter.AddEstimatedChunks(chunksCount); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
}

if w := resp.GetWarning(); w != "" {
myWarnings.Add(errors.New(w))
}

if h := resp.GetHints(); h != nil {
hints := hintspb.SeriesResponseHints{}
if err := types.UnmarshalAny(h, &hints); err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress())
}

ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks)
if err != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, errors.Wrapf(err, "failed to parse queried block IDs from received hints")
}

myQueriedBlocks = append(myQueriedBlocks, ids...)
}

if s := resp.GetStats(); s != nil {
indexBytesFetched += s.FetchedIndexBytes
}

if ss := resp.GetStreamingSeries(); ss != nil {
myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))

for _, s := range ss.Series {
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

// Add series fingerprint to query limiter; will return error if we are over the limit
if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, limitErr
}

myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
}

if ss.IsEndOfSeriesStream {
return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, true, false, nil
}
}

return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, nil
}

func shouldRetry(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
Expand Down
15 changes: 15 additions & 0 deletions vendor/google.golang.org/grpc/experimental/stats/metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/google.golang.org/grpc/version.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 52b3560

Please sign in to comment.