Skip to content

Commit

Permalink
distributor: let queryIngesterStream work in LabelAdapters
Browse files Browse the repository at this point in the history
Convert to Labels after deduplication.
  • Loading branch information
bboreham committed Apr 8, 2024
1 parent 8838902 commit f1cc93e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 35 deletions.
35 changes: 19 additions & 16 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
// Why retain the batches rather than iteratively build a single slice?
// If we iteratively build a single slice, we'll spend a lot of time copying elements as the slice grows beyond its capacity.
// So instead, we build the slice in one go once we know how many series we have.
var streamingSeriesBatches [][]labels.Labels
var streamingSeriesBatches [][][]mimirpb.LabelAdapter
streamingSeriesCount := 0

for {
Expand Down Expand Up @@ -300,7 +300,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

result.chunkseriesBatches = append(result.chunkseriesBatches, resp.Chunkseries)
} else if len(resp.StreamingSeries) > 0 {
labelsBatch := make([]labels.Labels, 0, len(resp.StreamingSeries))
labelsBatch := make([][]mimirpb.LabelAdapter, 0, len(resp.StreamingSeries))
streamingSeriesCount += len(resp.StreamingSeries)

for _, s := range resp.StreamingSeries {
Expand All @@ -317,15 +317,15 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

labelsBatch = append(labelsBatch, mimirpb.FromLabelAdaptersToLabels(s.Labels))
labelsBatch = append(labelsBatch, s.Labels)
}

streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
}

if resp.IsEndOfSeriesStream {
if streamingSeriesCount > 0 {
result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)
result.streamingSeries.Series = make([][]mimirpb.LabelAdapter, 0, streamingSeriesCount)

for _, batch := range streamingSeriesBatches {
result.streamingSeries.Series = append(result.streamingSeries.Series, batch...)
Expand Down Expand Up @@ -499,7 +499,7 @@ func sameSamples(a, b []mimirpb.Sample) bool {

type seriesChunksStream struct {
StreamReader *ingester_client.SeriesChunksStreamReader
Series []labels.Labels
Series [][]mimirpb.LabelAdapter
}

func mergeSeriesChunkStreams(results []ingesterQueryResult, estimatedIngestersPerSeries int) []ingester_client.StreamingSeries {
Expand All @@ -508,10 +508,13 @@ func mergeSeriesChunkStreams(results []ingesterQueryResult, estimatedIngestersPe
return nil
}

builder := labels.NewBuilder(labels.EmptyLabels())
var allSeries []ingester_client.StreamingSeries

for tree.Next() {
nextIngester, nextSeriesFromIngester, nextSeriesIndex := tree.Winner()
nextIngester, lbls, nextSeriesIndex := tree.Winner()
mimirpb.FromLabelAdaptersToBuilder(lbls, builder)
nextSeriesFromIngester := builder.Labels()
lastSeriesIndex := len(allSeries) - 1

if len(allSeries) == 0 || !labels.Equal(allSeries[lastSeriesIndex].Labels, nextSeriesFromIngester) {
Expand Down Expand Up @@ -579,25 +582,25 @@ type seriesChunkStreamsTree struct {
}

type seriesChunkStreamsTreeNode struct {
index int // This is the loser for all nodes except the 0th, where it is the winner.
value labels.Labels // Value copied from the loser node, or winner for node 0.
ingester seriesChunksStream // Only populated for leaf nodes.
nextSeriesIndex uint64 // Only populated for leaf nodes.
index int // This is the loser for all nodes except the 0th, where it is the winner.
value []mimirpb.LabelAdapter // Value copied from the loser node, or winner for node 0.
ingester seriesChunksStream // Only populated for leaf nodes.
nextSeriesIndex uint64 // Only populated for leaf nodes.
}

func (t *seriesChunkStreamsTree) moveNext(index int) bool {
n := &t.nodes[index]
n.nextSeriesIndex++
if int(n.nextSeriesIndex) > len(n.ingester.Series) {
n.value = labels.EmptyLabels()
n.value = nil
n.index = -1
return false
}
n.value = n.ingester.Series[n.nextSeriesIndex-1]
return true
}

func (t *seriesChunkStreamsTree) Winner() (seriesChunksStream, labels.Labels, uint64) {
func (t *seriesChunkStreamsTree) Winner() (seriesChunksStream, []mimirpb.LabelAdapter, uint64) {
n := t.nodes[t.nodes[0].index]
return n.ingester, n.value, n.nextSeriesIndex - 1
}
Expand Down Expand Up @@ -662,16 +665,16 @@ func (t *seriesChunkStreamsTree) playGame(a, b int) (loser, winner int) {
return a, b
}

func (t *seriesChunkStreamsTree) less(a, b labels.Labels) bool {
if a.IsEmpty() {
func (t *seriesChunkStreamsTree) less(a, b []mimirpb.LabelAdapter) bool {
if a == nil {
return false
}

if b.IsEmpty() {
if b == nil {
return true
}

return labels.Compare(a, b) < 0
return mimirpb.CompareLabelAdapters(a, b) < 0
}

func parent(i int) int { return i / 2 }
38 changes: 19 additions & 19 deletions pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"slices"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -685,13 +685,13 @@ func TestMergingAndSortingSeries(t *testing.T) {
},
"single ingester, no series": {
results: []ingesterQueryResult{
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: []labels.Labels{}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: [][]mimirpb.LabelAdapter{}}},
},
expected: []ingester_client.StreamingSeries{},
},
"single ingester, single series": {
results: []ingesterQueryResult{
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: []labels.Labels{labels.FromStrings("some-label", "some-value")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "some-value")}}},
},
expected: []ingester_client.StreamingSeries{
{
Expand All @@ -704,9 +704,9 @@ func TestMergingAndSortingSeries(t *testing.T) {
},
"multiple ingesters, each with single series": {
results: []ingesterQueryResult{
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: []labels.Labels{labels.FromStrings("some-label", "some-value")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: []labels.Labels{labels.FromStrings("some-label", "some-value")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: []labels.Labels{labels.FromStrings("some-label", "some-value")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "some-value")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "some-value")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "some-value")}}},
},
expected: []ingester_client.StreamingSeries{
{
Expand All @@ -721,9 +721,9 @@ func TestMergingAndSortingSeries(t *testing.T) {
},
"multiple ingesters, each with different series": {
results: []ingesterQueryResult{
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: []labels.Labels{labels.FromStrings("some-label", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: []labels.Labels{labels.FromStrings("some-label", "value-b")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: []labels.Labels{labels.FromStrings("some-label", "value-c")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "value-b")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "value-c")}}},
},
expected: []ingester_client.StreamingSeries{
{
Expand All @@ -748,9 +748,9 @@ func TestMergingAndSortingSeries(t *testing.T) {
},
"multiple ingesters, each with different series, with earliest ingesters having last series": {
results: []ingesterQueryResult{
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: []labels.Labels{labels.FromStrings("some-label", "value-c")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: []labels.Labels{labels.FromStrings("some-label", "value-b")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: []labels.Labels{labels.FromStrings("some-label", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "value-c")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "value-b")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: [][]mimirpb.LabelAdapter{labelAdapters("some-label", "value-a")}}},
},
expected: []ingester_client.StreamingSeries{
{
Expand All @@ -775,9 +775,9 @@ func TestMergingAndSortingSeries(t *testing.T) {
},
"multiple ingesters, each with multiple series": {
results: []ingesterQueryResult{
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: []labels.Labels{labels.FromStrings("label-a", "value-a"), labels.FromStrings("label-b", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: []labels.Labels{labels.FromStrings("label-a", "value-b"), labels.FromStrings("label-b", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: []labels.Labels{labels.FromStrings("label-a", "value-c"), labels.FromStrings("label-b", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester1, Series: [][]mimirpb.LabelAdapter{labelAdapters("label-a", "value-a"), labelAdapters("label-b", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester2, Series: [][]mimirpb.LabelAdapter{labelAdapters("label-a", "value-b"), labelAdapters("label-b", "value-a")}}},
{streamingSeries: seriesChunksStream{StreamReader: ingester3, Series: [][]mimirpb.LabelAdapter{labelAdapters("label-a", "value-c"), labelAdapters("label-b", "value-a")}}},
},
expected: []ingester_client.StreamingSeries{
{
Expand Down Expand Up @@ -820,7 +820,7 @@ func TestMergingAndSortingSeries(t *testing.T) {
actualSeries := actual[i]
expectedSeries := testCase.expected[i]

require.Equal(t, expectedSeries.Labels, actualSeries.Labels)
require.True(t, labels.Equal(expectedSeries.Labels, actualSeries.Labels))

// We don't care about the order.
require.ElementsMatch(t, expectedSeries.Sources, actualSeries.Sources, "series %v", actualSeries.Labels.String())
Expand All @@ -847,10 +847,10 @@ func BenchmarkMergingAndSortingSeries(b *testing.B) {

func generateSeriesSets(ingestersPerZone int, zones int, seriesPerIngester int) []ingesterQueryResult {
seriesPerZone := ingestersPerZone * seriesPerIngester
zoneSeries := make([]labels.Labels, seriesPerZone)
zoneSeries := make([][]mimirpb.LabelAdapter, seriesPerZone)

for seriesIdx := 0; seriesIdx < seriesPerZone; seriesIdx++ {
zoneSeries[seriesIdx] = labels.FromStrings("the-label", strconv.Itoa(seriesIdx))
zoneSeries[seriesIdx] = labelAdapters("the-label", strconv.Itoa(seriesIdx))
}

results := make([]ingesterQueryResult, 0, zones*ingestersPerZone)
Expand All @@ -861,7 +861,7 @@ func generateSeriesSets(ingestersPerZone int, zones int, seriesPerIngester int)
for ingester := 1; ingester <= ingestersPerZone; ingester++ {
streamReader := &ingester_client.SeriesChunksStreamReader{}
series := zoneSeries[(ingester-1)*seriesPerIngester : ingester*seriesPerIngester]
sort.Sort(byLabels(series))
slices.SortFunc(series, mimirpb.CompareLabelAdapters)

results = append(results, ingesterQueryResult{streamingSeries: seriesChunksStream{StreamReader: streamReader, Series: series}})
}
Expand Down

0 comments on commit f1cc93e

Please sign in to comment.