Skip to content

Commit

Permalink
Distributor.queryIngesterStream: Free gRPC buffers upon error
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Oct 28, 2024
1 parent 11337c6 commit 59c3ea9
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,20 @@ type ingesterQueryResult struct {
chunkseriesBatches [][]ingester_client.TimeSeriesChunk
timeseriesBatches [][]mimirpb.TimeSeries
streamingSeries seriesChunksStream

// Retain responses owning referenced gRPC buffers, until they are freed.
responses []*ingester_client.QueryStreamResponse
}

func (r *ingesterQueryResult) addResponse(resp *ingester_client.QueryStreamResponse) {
r.responses = append(r.responses, resp)
}

func (r *ingesterQueryResult) freeBuffers() {
for _, resp := range r.responses {
resp.FreeBuffer()
}
r.responses = nil
}

// queryIngesterStream queries the ingesters using the gRPC streaming API.
Expand Down Expand Up @@ -258,17 +272,23 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
streamingSeriesCount := 0

for {
// XXX: Note that while we free responses' gRPC buffers on error, we don't do the same in case of success,
// as the combined response retains references to gRPC buffers.
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
// We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here.
return result, nil
} else if err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}

result.addResponse(resp)

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
result.freeBuffers()
return ingesterQueryResult{}, limitErr
}
}
Expand All @@ -277,20 +297,24 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
} else if len(resp.Chunkseries) > 0 {
// Enforce the max chunks limits.
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(series.Labels); err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}
}

if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}

Expand All @@ -301,15 +325,18 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

for _, s := range resp.StreamingSeries {
if err := queryLimiter.AddSeries(s.Labels); err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
result.freeBuffers()
return ingesterQueryResult{}, err
}

Expand Down

0 comments on commit 59c3ea9

Please sign in to comment.