Skip to content

Commit

Permalink
update proto
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 3, 2024
1 parent 853d38a commit f49e90e
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 245 deletions.
6 changes: 3 additions & 3 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
}

histogramsLen := len(sampleStream.Histograms)
var histograms []*tripperware.SampleHistogramPair
var histograms []tripperware.SampleHistogramPair
if histogramsLen > 0 {
histograms = make([]*tripperware.SampleHistogramPair, histogramsLen)
histograms = make([]tripperware.SampleHistogramPair, histogramsLen)
for j := 0; j < histogramsLen; j++ {
bucketsLen := len(sampleStream.Histograms[j].H.NegativeBuckets) + len(sampleStream.Histograms[j].H.PositiveBuckets)
if sampleStream.Histograms[j].H.ZeroCount > 0 {
Expand All @@ -137,7 +137,7 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
buckets := make([]*tripperware.HistogramBucket, bucketsLen)
it := sampleStream.Histograms[j].H.AllBucketIterator()
getBuckets(buckets, it)
histograms[j] = &tripperware.SampleHistogramPair{
histograms[j] = tripperware.SampleHistogramPair{
TimestampMs: sampleStream.Histograms[j].T,
Histogram: tripperware.SampleHistogram{
Count: sampleStream.Histograms[j].H.Count,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/codec/protobuf_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
Labels: []cortexpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
},
Histograms: []*tripperware.SampleHistogramPair{
Histograms: []tripperware.SampleHistogramPair{
{
TimestampMs: 1000,
Histogram: tripperware.SampleHistogram{
Expand Down
34 changes: 2 additions & 32 deletions pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,18 +372,8 @@ func mergeSampleStreams(output map[string]SampleStream, sampleStreams []SampleSt
stream.Histograms = sliceHistograms(stream.Histograms, existingEndTs)
}
}
// Same for above.
if len(existing.RawHistograms) > 0 && len(stream.RawHistograms) > 0 {
existingEndTs := existing.RawHistograms[len(existing.RawHistograms)-1].GetTimestampMs()
if existingEndTs == stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = stream.RawHistograms[1:]
} else if existingEndTs > stream.RawHistograms[0].GetTimestampMs() {
stream.RawHistograms = sliceRawHistograms(stream.RawHistograms, existingEndTs)
}
}
existing.Samples = append(existing.Samples, stream.Samples...)
existing.Histograms = append(existing.Histograms, stream.Histograms...)
existing.RawHistograms = append(existing.RawHistograms, stream.RawHistograms...)

output[metric] = existing
}
Expand All @@ -410,31 +400,11 @@ func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
return samples[searchResult:]
}

// sliceHistogram assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceHistograms(histograms []*SampleHistogramPair, minTs int64) []*SampleHistogramPair {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}

if len(histograms) > 0 && minTs > histograms[len(histograms)-1].GetTimestampMs() {
return histograms[len(histograms):]
}

searchResult := sort.Search(len(histograms), func(i int) bool {
return histograms[i].GetTimestampMs() > minTs
})

return histograms[searchResult:]
}

// sliceRawHistograms assumes given histogram are sorted by timestamp in ascending order and
// sliceHistograms assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func sliceRawHistograms(histograms []*cortexpb.Histogram, minTs int64) []*cortexpb.Histogram {
func sliceHistograms(histograms []SampleHistogramPair, minTs int64) []SampleHistogramPair {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestampMs() {
return histograms
}
Expand Down
60 changes: 30 additions & 30 deletions pkg/querier/tripperware/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
},
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand All @@ -122,7 +122,7 @@ func TestMergeSampleStreams(t *testing.T) {
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
},
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand All @@ -133,7 +133,7 @@ func TestMergeSampleStreams(t *testing.T) {
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
},
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
},
},
Expand Down Expand Up @@ -176,15 +176,15 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand All @@ -194,7 +194,7 @@ func TestMergeSampleStreams(t *testing.T) {
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -213,7 +213,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 2, TimestampMs: 2},
{Value: 3, TimestampMs: 3},
},
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -226,7 +226,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 1, TimestampMs: 1},
{Value: 4, TimestampMs: 4},
},
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand All @@ -242,7 +242,7 @@ func TestMergeSampleStreams(t *testing.T) {
{Value: 3, TimestampMs: 3},
{Value: 4, TimestampMs: 4},
},
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand Down Expand Up @@ -310,30 +310,30 @@ func TestMergeSampleStreams(t *testing.T) {
sampleStreams: []SampleStream{
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram2, TimestampMs: 1},
{Histogram: testHistogram2, TimestampMs: 4},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
},
},
{
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram2, TimestampMs: 2},
{Histogram: testHistogram2, TimestampMs: 3},
},
Expand All @@ -342,7 +342,7 @@ func TestMergeSampleStreams(t *testing.T) {
expectedOutput: map[string]SampleStream{
ingester_client.LabelsToKeyString(lbls): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 2},
{Histogram: testHistogram1, TimestampMs: 3},
Expand All @@ -351,7 +351,7 @@ func TestMergeSampleStreams(t *testing.T) {
},
ingester_client.LabelsToKeyString(lbls1): {
Labels: cortexpb.FromLabelsToLabelAdapters(lbls1),
Histograms: []*SampleHistogramPair{
Histograms: []SampleHistogramPair{
{Histogram: testHistogram1, TimestampMs: 0},
{Histogram: testHistogram1, TimestampMs: 1},
{Histogram: testHistogram1, TimestampMs: 4},
Expand Down Expand Up @@ -453,21 +453,21 @@ func TestSliceHistograms(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
histograms []*SampleHistogramPair
histograms []SampleHistogramPair
minTs int64
expectedHistograms []*SampleHistogramPair
expectedHistograms []SampleHistogramPair
}{
{name: "empty histograms"},
{
name: "minTs smaller than first histogram's timestamp",
histograms: []*SampleHistogramPair{
histograms: []SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
},
},
minTs: 0,
expectedHistograms: []*SampleHistogramPair{
expectedHistograms: []SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -476,7 +476,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "input histograms are not sorted, return all histograms",
histograms: []*SampleHistogramPair{
histograms: []SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand All @@ -487,7 +487,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []*SampleHistogramPair{
expectedHistograms: []SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand All @@ -500,7 +500,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "minTs greater than the last histogram's timestamp",
histograms: []*SampleHistogramPair{
histograms: []SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -511,11 +511,11 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 3,
expectedHistograms: []*SampleHistogramPair{},
expectedHistograms: []SampleHistogramPair{},
},
{
name: "input histograms not sorted, minTs greater than the last histogram's timestamp",
histograms: []*SampleHistogramPair{
histograms: []SampleHistogramPair{
{
TimestampMs: 0,
Histogram: testHistogram1,
Expand All @@ -530,11 +530,11 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []*SampleHistogramPair{},
expectedHistograms: []SampleHistogramPair{},
},
{
name: "input histograms are sorted",
histograms: []*SampleHistogramPair{
histograms: []SampleHistogramPair{
{
TimestampMs: 2,
Histogram: testHistogram1,
Expand All @@ -549,7 +549,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 1,
expectedHistograms: []*SampleHistogramPair{
expectedHistograms: []SampleHistogramPair{
{
TimestampMs: 2,
Histogram: testHistogram1,
Expand All @@ -566,7 +566,7 @@ func TestSliceHistograms(t *testing.T) {
},
{
name: "input histograms are sorted, get sliced histograms",
histograms: []*SampleHistogramPair{
histograms: []SampleHistogramPair{
{
TimestampMs: 1,
Histogram: testHistogram1,
Expand All @@ -581,7 +581,7 @@ func TestSliceHistograms(t *testing.T) {
},
},
minTs: 2,
expectedHistograms: []*SampleHistogramPair{
expectedHistograms: []SampleHistogramPair{
{
TimestampMs: 3,
Histogram: testHistogram1,
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
for iter.ReadArray() {
h := SampleHistogramPair{}
UnmarshalSampleHistogramPairJSON(unsafe.Pointer(&h), iter)
ss.Histograms = append(ss.Histograms, &h)
ss.Histograms = append(ss.Histograms, h)
}
default:
iter.ReportError("unmarshal SampleStream", fmt.Sprint("unexpected key:", field))
Expand Down Expand Up @@ -323,7 +323,7 @@ func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) {
if i > 0 {
stream.WriteMore()
}
MarshalSampleHistogramPairJSON(unsafe.Pointer(h), stream)
MarshalSampleHistogramPairJSON(unsafe.Pointer(&h), stream)
}
stream.WriteArrayEnd()
}
Expand Down
Loading

0 comments on commit f49e90e

Please sign in to comment.