diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 73610f6e089..6b652aa1106 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -206,6 +206,20 @@ type ingesterQueryResult struct { chunkseriesBatches [][]ingester_client.TimeSeriesChunk timeseriesBatches [][]mimirpb.TimeSeries streamingSeries seriesChunksStream + + // Retain responses owning referenced gRPC buffers, until they are freed. + responses []*ingester_client.QueryStreamResponse +} + +func (r *ingesterQueryResult) addResponse(resp *ingester_client.QueryStreamResponse) { + r.responses = append(r.responses, resp) +} + +func (r *ingesterQueryResult) freeBuffers() { + for _, resp := range r.responses { + resp.FreeBuffer() + } + r.responses = nil } // queryIngesterStream queries the ingesters using the gRPC streaming API. @@ -258,17 +272,23 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ streamingSeriesCount := 0 for { + // XXX: Note that while we free responses' gRPC buffers on error, we don't do the same in case of success, + // as the combined response retains references to gRPC buffers. resp, err := stream.Recv() if errors.Is(err, io.EOF) { // We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here. return result, nil } else if err != nil { + result.freeBuffers() return ingesterQueryResult{}, err } + result.addResponse(resp) + if len(resp.Timeseries) > 0 { for _, series := range resp.Timeseries { if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { + result.freeBuffers() return ingesterQueryResult{}, limitErr } } @@ -277,20 +297,24 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ } else if len(resp.Chunkseries) > 0 { // Enforce the max chunks limits. if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { + result.freeBuffers() return ingesterQueryResult{}, err } if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { + result.freeBuffers() return ingesterQueryResult{}, err } for _, series := range resp.Chunkseries { if err := queryLimiter.AddSeries(series.Labels); err != nil { + result.freeBuffers() return ingesterQueryResult{}, err } } if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil { + result.freeBuffers() return ingesterQueryResult{}, err } @@ -301,15 +325,18 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ for _, s := range resp.StreamingSeries { if err := queryLimiter.AddSeries(s.Labels); err != nil { + result.freeBuffers() return ingesterQueryResult{}, err } // We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves. if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil { + result.freeBuffers() return ingesterQueryResult{}, err } if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil { + result.freeBuffers() return ingesterQueryResult{}, err }