Skip to content

Commit

Permalink
Creating symbols table on the query stream response
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed Dec 6, 2024
1 parent bdc357c commit fa9d8bf
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 92 deletions.
1 change: 1 addition & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(dataBytesLimitErr.Error())
}

result.DesymbolizeLabels()
result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
}
return result, nil
Expand Down
46 changes: 46 additions & 0 deletions pkg/ingester/client/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package client
import (
"encoding/binary"

writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"

"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/prometheus/prometheus/model/labels"
)

// ChunksCount returns the number of chunks in response.
Expand Down Expand Up @@ -45,3 +49,45 @@ func (m *QueryStreamResponse) SamplesCount() (count int) {
}
return
}

func (m *QueryStreamResponse) DesymbolizeLabels() {
if len(m.Symbols) == 0 {
return
}

b := labels.NewScratchBuilder(0)

for i, cs := range m.Chunkseries {
if len(cs.LabelsRefs) > 0 {
m.Chunkseries[i].Labels = cortexpb.FromLabelsToLabelAdapters(desymbolizeLabels(&b, cs.LabelsRefs, m.Symbols))
}
cs.LabelsRefs = cs.LabelsRefs[:0]
b.Reset()
}
m.Symbols = m.Symbols[:0]
}

func (m *QueryStreamResponse) SymbolizeLabels() {
if len(m.Symbols) > 0 {
return
}

st := writev2.NewSymbolTable()
for i, _ := range m.Chunkseries {
if len(m.Chunkseries[i].Labels) > 0 {
m.Chunkseries[i].LabelsRefs = st.SymbolizeLabels(cortexpb.FromLabelAdaptersToLabels(m.Chunkseries[i].Labels), m.Chunkseries[i].LabelsRefs)
}
m.Chunkseries[i].Labels = m.Chunkseries[i].Labels[:0]
}
m.Symbols = st.Symbols()
}

// desymbolizeLabels decodes label references, with given symbols to labels.
func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels {
b.Reset()
for i := 0; i < len(labelRefs); i += 2 {
b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]])
}
b.Sort()
return b.Labels()
}
26 changes: 26 additions & 0 deletions pkg/ingester/client/custom_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
github_com_cortexproject_cortex_pkg_cortexpb "github.com/cortexproject/cortex/pkg/cortexpb"
"testing"
"time"

Expand All @@ -11,6 +12,31 @@ import (
"github.com/cortexproject/cortex/pkg/util"
)

func TestSYmbolizeLabels(t *testing.T) {
chunkSeries := []TimeSeriesChunk{
{
Labels: []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter{
{"N1", "V1"},
{"N1", "V2"},
{"N3", "V3"},
},
Chunks: []Chunk{{Encoding: int32(encoding.PrometheusXorChunk), Data: []byte("data1")}},
},
{
Labels: []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter{
{"V1", "N3"},
{"N2", "V3"},
{"Final", "Final2"},
},
Chunks: []Chunk{{Encoding: int32(encoding.PrometheusXorChunk), Data: []byte("data1")}},
},
}

r := QueryStreamResponse{Chunkseries: chunkSeries}
r.SymbolizeLabels()
r.DesymbolizeLabels()
}

func TestSamplesCount(t *testing.T) {
floatChk := util.GenerateChunk(t, time.Second, model.Time(0), 100, encoding.PrometheusXorChunk)
histogramChk := util.GenerateChunk(t, time.Second, model.Time(0), 300, encoding.PrometheusHistogramChunk)
Expand Down
Loading

0 comments on commit fa9d8bf

Please sign in to comment.