From 9295ca544e16e9191e9989340355ebe8da4c215d Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 13 Dec 2024 08:51:06 +0100 Subject: [PATCH] Free buffers as soon as possible Signed-off-by: Arve Knudsen --- pkg/distributor/query.go | 201 +++++++++++++++--------------- pkg/ingester/client/mimir_util.go | 23 ---- 2 files changed, 98 insertions(+), 126 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 75801a4d752..2a2b922db53 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -223,20 +223,6 @@ type ingesterQueryResult struct { chunkseriesBatches [][]ingester_client.TimeSeriesChunk timeseriesBatches [][]mimirpb.TimeSeries streamingSeries seriesChunksStream - - // Retain responses owning referenced gRPC buffers, until they are freed. - responses []*ingester_client.QueryStreamResponse -} - -func (r *ingesterQueryResult) addResponse(resp *ingester_client.QueryStreamResponse) { - r.responses = append(r.responses, resp) -} - -func (r *ingesterQueryResult) freeBuffers() { - for _, resp := range r.responses { - resp.FreeBuffer() - } - r.responses = nil } // queryIngesterStream queries the ingesters using the gRPC streaming API. @@ -246,7 +232,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ // queryIngester MUST call cancelContext once processing is completed in order to release resources. It's required // by ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation() to properly release resources. - queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (result ingesterQueryResult, err error) { + queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (ingesterQueryResult, error) { log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.queryIngesterStream") cleanup := func() { log.Span.Finish() @@ -265,13 +251,13 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ cleanup() } - - result.freeBuffers() }() log.Span.SetTag("ingester_address", ing.Addr) log.Span.SetTag("ingester_zone", ing.Zone) + var result ingesterQueryResult + client, err := d.ingesterPool.GetClientForInstance(*ing) if err != nil { return result, err @@ -289,77 +275,18 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ streamingSeriesCount := 0 for { - resp, err := stream.Recv() + var err error + var isEOS bool + streamingSeriesCount, streamingSeriesBatches, isEOS, err = receiveResponse(stream, streamingSeriesCount, streamingSeriesBatches, queryLimiter, &result) if errors.Is(err, io.EOF) { // We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here. return result, nil } else if err != nil { return result, err } - - result.addResponse(resp) - - if len(resp.Timeseries) > 0 { - for _, series := range resp.Timeseries { - if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil { - return result, limitErr - } - } - - result.timeseriesBatches = append(result.timeseriesBatches, resp.Timeseries) - } else if len(resp.Chunkseries) > 0 { - // Enforce the max chunks limits. - if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { - return result, err - } - - if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { - return result, err - } - - for _, series := range resp.Chunkseries { - if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil { - return result, err - } - } - - if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil { - return result, err - } - - result.chunkseriesBatches = append(result.chunkseriesBatches, resp.Chunkseries) - } else if len(resp.StreamingSeries) > 0 { - labelsBatch := make([]labels.Labels, 0, len(resp.StreamingSeries)) - streamingSeriesCount += len(resp.StreamingSeries) - - for _, s := range resp.StreamingSeries { - l := mimirpb.FromLabelAdaptersToLabels(s.Labels) - // Clone unsafe labels. - l.InternStrings(strings.Clone) - - if err := queryLimiter.AddSeries(l); err != nil { - return result, err - } - - // We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves. - if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil { - return result, err - } - - if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil { - return result, err - } - - labelsBatch = append(labelsBatch, l) - } - - streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch) - } - - if resp.IsEndOfSeriesStream { + if isEOS { if streamingSeriesCount > 0 { result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount) - for _, batch := range streamingSeriesBatches { result.streamingSeries.Series = append(result.streamingSeries.Series, batch...) } @@ -372,8 +299,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ return result, nil } } - - return result, nil } cleanup := func(result ingesterQueryResult) { @@ -410,18 +335,11 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ for _, batch := range res.chunkseriesBatches { for _, series := range batch { key := mimirpb.FromLabelAdaptersToKeyString(series.Labels) - existing, exists := hashToChunkseries[key] - if !exists { - existing.Labels = make([]mimirpb.LabelAdapter, len(series.Labels)) - // Clone unsafe labels. - for i, l := range series.Labels { - existing.Labels[i].Name = strings.Clone(l.Name) - existing.Labels[i].Value = strings.Clone(l.Value) - } - } + existing := hashToChunkseries[key] + existing.Labels = series.Labels numPotentialChunks := len(existing.Chunks) + len(series.Chunks) - existing.Chunks = ingester_client.AccumulateChunksSafe(existing.Chunks, series.Chunks) + existing.Chunks = ingester_client.AccumulateChunks(existing.Chunks, series.Chunks) deduplicatedChunks += numPotentialChunks - len(existing.Chunks) totalChunks += len(series.Chunks) @@ -433,15 +351,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ for _, batch := range res.timeseriesBatches { for _, series := range batch { key := mimirpb.FromLabelAdaptersToKeyString(series.Labels) - existing, exists := hashToTimeSeries[key] - if !exists { - existing.Labels = make([]mimirpb.LabelAdapter, len(series.Labels)) - // Clone unsafe labels. - for i, l := range series.Labels { - existing.Labels[i].Name = strings.Clone(l.Name) - existing.Labels[i].Value = strings.Clone(l.Value) - } - } + existing := hashToTimeSeries[key] + existing.Labels = series.Labels if len(existing.Samples) == 0 { existing.Samples = series.Samples } else { @@ -455,8 +366,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ if res.streamingSeries.StreamReader != nil { res.streamingSeries.StreamReader.StartBuffering() } - - // res.freeBuffers() } // Now turn the accumulated maps into slices. @@ -481,6 +390,92 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ return resp, nil } +func receiveResponse(stream ingester_client.Ingester_QueryStreamClient, streamingSeriesCount int, streamingSeriesBatches [][]labels.Labels, queryLimiter *limiter.QueryLimiter, result *ingesterQueryResult) (int, [][]labels.Labels, bool, error) { + resp, err := stream.Recv() + if err != nil { + return 0, nil, false, err + } + defer resp.FreeBuffer() + + if len(resp.Timeseries) > 0 { + for _, series := range resp.Timeseries { + if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil { + return 0, nil, false, limitErr + } + } + + for i, ts := range resp.Timeseries { + for j, l := range ts.Labels { + resp.Timeseries[i].Labels[j].Name = strings.Clone(l.Name) + resp.Timeseries[i].Labels[j].Value = strings.Clone(l.Value) + } + for j, e := range ts.Exemplars { + for k, l := range e.Labels { + resp.Timeseries[i].Exemplars[j].Labels[k].Name = strings.Clone(l.Name) + resp.Timeseries[i].Exemplars[j].Labels[k].Value = strings.Clone(l.Value) + } + } + } + result.timeseriesBatches = append(result.timeseriesBatches, resp.Timeseries) + } else if len(resp.Chunkseries) > 0 { + // Enforce the max chunks limits. + if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { + return 0, nil, false, err + } + + if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil { + return 0, nil, false, err + } + + for _, series := range resp.Chunkseries { + if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil { + return 0, nil, false, err + } + } + + if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil { + return 0, nil, false, err + } + + for i, s := range resp.Chunkseries { + for j, l := range s.Labels { + resp.Chunkseries[i].Labels[j].Name = strings.Clone(l.Name) + resp.Chunkseries[i].Labels[j].Value = strings.Clone(l.Value) + } + for j, c := range s.Chunks { + resp.Chunkseries[i].Chunks[j].Data = slices.Clone(c.Data) + } + } + result.chunkseriesBatches = append(result.chunkseriesBatches, resp.Chunkseries) + } else if len(resp.StreamingSeries) > 0 { + labelsBatch := make([]labels.Labels, 0, len(resp.StreamingSeries)) + streamingSeriesCount += len(resp.StreamingSeries) + + for _, s := range resp.StreamingSeries { + l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels) + + if err := queryLimiter.AddSeries(l); err != nil { + return 0, nil, false, err + } + + // We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves. + if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil { + return 0, nil, false, err + } + + if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil { + return 0, nil, false, err + } + + labelsBatch = append(labelsBatch, l) + } + + streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch) + } + + return streamingSeriesCount, streamingSeriesBatches, resp.IsEndOfSeriesStream, nil +} + // estimatedIngestersPerSeries estimates the number of ingesters that will have chunks for each streaming series. func (d *Distributor) estimatedIngestersPerSeries(replicationSets []ring.ReplicationSet) int { if d.cfg.IngestStorageConfig.Enabled { diff --git a/pkg/ingester/client/mimir_util.go b/pkg/ingester/client/mimir_util.go index 8c5dbeb1f7a..9dbb536b500 100644 --- a/pkg/ingester/client/mimir_util.go +++ b/pkg/ingester/client/mimir_util.go @@ -78,29 +78,6 @@ func AccumulateChunks(a, b []Chunk) []Chunk { return ret } -// AccumulateChunksSafe builds a slice of chunks, eliminating duplicates. -// This is like AccumulateChunks except that unsafe chunk data is copied. -func AccumulateChunksSafe(a, b []Chunk) []Chunk { - // If a is empty, we can just return b. - // The loop below effectively does the same thing for the opposite scenario (if b is empty, we'll just return a unmodified). - if len(a) == 0 { - return b - } - - ret := a - for j := range b { - if containsChunk(a, b[j]) { - continue - } - - c := b[j] - c.Data = make([]byte, len(b[j].Data)) - copy(c.Data, b[j].Data) - ret = append(ret, c) - } - return ret -} - func containsChunk(a []Chunk, b Chunk) bool { for i := range a { if a[i].Equal(b) {