Skip to content

Commit

Permalink
Free buffers as soon as possible
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Dec 13, 2024
1 parent 645003d commit 7f962d6
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 124 deletions.
199 changes: 98 additions & 101 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,6 @@ type ingesterQueryResult struct {
chunkseriesBatches [][]ingester_client.TimeSeriesChunk
timeseriesBatches [][]mimirpb.TimeSeries
streamingSeries seriesChunksStream

// Retain responses owning referenced gRPC buffers, until they are freed.
responses []*ingester_client.QueryStreamResponse
}

func (r *ingesterQueryResult) addResponse(resp *ingester_client.QueryStreamResponse) {
r.responses = append(r.responses, resp)
}

func (r *ingesterQueryResult) freeBuffers() {
for _, resp := range r.responses {
resp.FreeBuffer()
}
r.responses = nil
}

// queryIngesterStream queries the ingesters using the gRPC streaming API.
Expand All @@ -246,7 +232,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

// queryIngester MUST call cancelContext once processing is completed in order to release resources. It's required
// by ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation() to properly release resources.
queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (result ingesterQueryResult, err error) {
queryIngester := func(ctx context.Context, ing *ring.InstanceDesc, cancelContext context.CancelCauseFunc) (ingesterQueryResult, error) {
log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.queryIngesterStream")
cleanup := func() {
log.Span.Finish()
Expand All @@ -265,13 +251,13 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

cleanup()
}

result.freeBuffers()
}()

log.Span.SetTag("ingester_address", ing.Addr)
log.Span.SetTag("ingester_zone", ing.Zone)

var result ingesterQueryResult

client, err := d.ingesterPool.GetClientForInstance(*ing)
if err != nil {
return result, err
Expand All @@ -289,77 +275,18 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
streamingSeriesCount := 0

for {
resp, err := stream.Recv()
var err error
var isEOS bool
streamingSeriesCount, streamingSeriesBatches, isEOS, err = receiveResponse(stream, streamingSeriesCount, streamingSeriesBatches, queryLimiter, &result)
if errors.Is(err, io.EOF) {
// We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here.
return result, nil
} else if err != nil {
return result, err
}

result.addResponse(resp)

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
return result, limitErr
}
}

result.timeseriesBatches = append(result.timeseriesBatches, resp.Timeseries)
} else if len(resp.Chunkseries) > 0 {
// Enforce the max chunks limits.
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return result, err
}

if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return result, err
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
return result, err
}
}

if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
return result, err
}

result.chunkseriesBatches = append(result.chunkseriesBatches, resp.Chunkseries)
} else if len(resp.StreamingSeries) > 0 {
labelsBatch := make([]labels.Labels, 0, len(resp.StreamingSeries))
streamingSeriesCount += len(resp.StreamingSeries)

for _, s := range resp.StreamingSeries {
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)
// Clone unsafe labels.
l.InternStrings(strings.Clone)

if err := queryLimiter.AddSeries(l); err != nil {
return result, err
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
return result, err
}

if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
return result, err
}

labelsBatch = append(labelsBatch, l)
}

streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
}

if resp.IsEndOfSeriesStream {
if isEOS {
if streamingSeriesCount > 0 {
result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)

for _, batch := range streamingSeriesBatches {
result.streamingSeries.Series = append(result.streamingSeries.Series, batch...)
}
Expand Down Expand Up @@ -410,18 +337,11 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
for _, batch := range res.chunkseriesBatches {
for _, series := range batch {
key := mimirpb.FromLabelAdaptersToKeyString(series.Labels)
existing, exists := hashToChunkseries[key]
if !exists {
existing.Labels = make([]mimirpb.LabelAdapter, len(series.Labels))
// Clone unsafe labels.
for i, l := range series.Labels {
existing.Labels[i].Name = strings.Clone(l.Name)
existing.Labels[i].Value = strings.Clone(l.Value)
}
}
existing := hashToChunkseries[key]
existing.Labels = series.Labels

numPotentialChunks := len(existing.Chunks) + len(series.Chunks)
existing.Chunks = ingester_client.AccumulateChunksSafe(existing.Chunks, series.Chunks)
existing.Chunks = ingester_client.AccumulateChunks(existing.Chunks, series.Chunks)

deduplicatedChunks += numPotentialChunks - len(existing.Chunks)
totalChunks += len(series.Chunks)
Expand All @@ -433,15 +353,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
for _, batch := range res.timeseriesBatches {
for _, series := range batch {
key := mimirpb.FromLabelAdaptersToKeyString(series.Labels)
existing, exists := hashToTimeSeries[key]
if !exists {
existing.Labels = make([]mimirpb.LabelAdapter, len(series.Labels))
// Clone unsafe labels.
for i, l := range series.Labels {
existing.Labels[i].Name = strings.Clone(l.Name)
existing.Labels[i].Value = strings.Clone(l.Value)
}
}
existing := hashToTimeSeries[key]
existing.Labels = series.Labels
if len(existing.Samples) == 0 {
existing.Samples = series.Samples
} else {
Expand All @@ -455,8 +368,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
if res.streamingSeries.StreamReader != nil {
res.streamingSeries.StreamReader.StartBuffering()
}

// res.freeBuffers()
}

// Now turn the accumulated maps into slices.
Expand All @@ -481,6 +392,92 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return resp, nil
}

func receiveResponse(stream ingester_client.Ingester_QueryStreamClient, streamingSeriesCount int, streamingSeriesBatches [][]labels.Labels, queryLimiter *limiter.QueryLimiter, result *ingesterQueryResult) (int, [][]labels.Labels, bool, error) {
resp, err := stream.Recv()
if err != nil {
return 0, nil, false, err
}
defer resp.FreeBuffer()

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
return 0, nil, false, limitErr
}
}

for i, ts := range resp.Timeseries {
for j, l := range ts.Labels {
resp.Timeseries[i].Labels[j].Name = strings.Clone(l.Name)
resp.Timeseries[i].Labels[j].Value = strings.Clone(l.Value)
}
for j, e := range ts.Exemplars {
for k, l := range e.Labels {
resp.Timeseries[i].Exemplars[j].Labels[k].Name = strings.Clone(l.Name)
resp.Timeseries[i].Exemplars[j].Labels[k].Value = strings.Clone(l.Value)
}
}
}
result.timeseriesBatches = append(result.timeseriesBatches, resp.Timeseries)
} else if len(resp.Chunkseries) > 0 {
// Enforce the max chunks limits.
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return 0, nil, false, err
}

if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return 0, nil, false, err
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
return 0, nil, false, err
}
}

if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
return 0, nil, false, err
}

for i, s := range resp.Chunkseries {
for j, l := range s.Labels {
resp.Chunkseries[i].Labels[j].Name = strings.Clone(l.Name)
resp.Chunkseries[i].Labels[j].Value = strings.Clone(l.Value)
}
for j, c := range s.Chunks {
resp.Chunkseries[i].Chunks[j].Data = slices.Clone(c.Data)
}
}
result.chunkseriesBatches = append(result.chunkseriesBatches, resp.Chunkseries)
} else if len(resp.StreamingSeries) > 0 {
labelsBatch := make([]labels.Labels, 0, len(resp.StreamingSeries))
streamingSeriesCount += len(resp.StreamingSeries)

for _, s := range resp.StreamingSeries {
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

if err := queryLimiter.AddSeries(l); err != nil {
return 0, nil, false, err
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
return 0, nil, false, err
}

if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
return 0, nil, false, err
}

labelsBatch = append(labelsBatch, l)
}

streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
}

return streamingSeriesCount, streamingSeriesBatches, resp.IsEndOfSeriesStream, nil
}

// estimatedIngestersPerSeries estimates the number of ingesters that will have chunks for each streaming series.
func (d *Distributor) estimatedIngestersPerSeries(replicationSets []ring.ReplicationSet) int {
if d.cfg.IngestStorageConfig.Enabled {
Expand Down
23 changes: 0 additions & 23 deletions pkg/ingester/client/mimir_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,6 @@ func AccumulateChunks(a, b []Chunk) []Chunk {
return ret
}

// AccumulateChunksSafe builds a slice of chunks, eliminating duplicates.
// This is like AccumulateChunks except that unsafe chunk data is copied.
func AccumulateChunksSafe(a, b []Chunk) []Chunk {
// If a is empty, we can just return b.
// The loop below effectively does the same thing for the opposite scenario (if b is empty, we'll just return a unmodified).
if len(a) == 0 {
return b
}

ret := a
for j := range b {
if containsChunk(a, b[j]) {
continue
}

c := b[j]
c.Data = make([]byte, len(b[j].Data))
copy(c.Data, b[j].Data)
ret = append(ret, c)
}
return ret
}

func containsChunk(a []Chunk, b Chunk) bool {
for i := range a {
if a[i].Equal(b) {
Expand Down

0 comments on commit 7f962d6

Please sign in to comment.