diff --git a/integration/e2e/util.go b/integration/e2e/util.go index 6e6c1cec4fe..51a3cfcaa06 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -25,7 +25,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" - "github.com/cortexproject/cortex/pkg/cortexpbv2" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" ) @@ -151,7 +150,7 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label) return } -func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries) { +func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) { tsMillis := TimeToMilliseconds(ts) st := writev2.NewSymbolTable() @@ -164,20 +163,20 @@ func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogr var ( h *histogram.Histogram fh *histogram.FloatHistogram - ph cortexpbv2.Histogram + ph writev2.Histogram ) if floatHistogram { fh = tsdbutil.GenerateTestFloatHistogram(int(i)) - ph = cortexpbv2.FloatHistogramToHistogramProto(tsMillis, fh) + ph = writev2.FromFloatHistogram(tsMillis, fh) } else { h = tsdbutil.GenerateTestHistogram(int(i)) - ph = cortexpbv2.HistogramToHistogramProto(tsMillis, h) + ph = writev2.FromIntHistogram(tsMillis, h) } // Generate the series - series = append(series, cortexpbv2.TimeSeries{ + series = append(series, writev2.TimeSeries{ LabelsRefs: st.SymbolizeLabels(lbs, nil), - Histograms: []cortexpbv2.Histogram{ph}, + Histograms: []writev2.Histogram{ph}, }) symbols = st.Symbols() @@ -224,7 +223,7 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram return } -func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries, vector model.Vector) { +func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) { tsMillis := TimeToMilliseconds(ts) value := rand.Float64() @@ -237,14 +236,14 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe Value: label.Value, }) } - series = append(series, cortexpbv2.TimeSeries{ + series = append(series, writev2.TimeSeries{ // Generate the series LabelsRefs: st.SymbolizeLabels(lbs, nil), - Samples: []cortexpbv2.Sample{ + Samples: []writev2.Sample{ {Value: value, Timestamp: tsMillis}, }, - Metadata: cortexpbv2.Metadata{ - Type: cortexpbv2.METRIC_TYPE_GAUGE, + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, }, }) symbols = st.Symbols() diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index d4126d786a0..286656bfa5f 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/pdata/pcommon" @@ -30,7 +31,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" yaml "gopkg.in/yaml.v3" - "github.com/cortexproject/cortex/pkg/cortexpbv2" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/util/backoff" ) @@ -114,9 +114,9 @@ func NewPromQueryClient(address string) (*Client, error) { } // PushV2 the input timeseries to the remote endpoint -func (c *Client) PushV2(symbols []string, timeseries []cortexpbv2.TimeSeries) (*http.Response, error) { +func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) { // Create write request - data, err := proto.Marshal(&cortexpbv2.WriteRequest{Symbols: symbols, Timeseries: timeseries}) + data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries}) if err != nil { return nil, err } diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index a44577d6ef4..775ccb98bb3 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,7 +20,6 @@ import ( "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" - "github.com/cortexproject/cortex/pkg/cortexpbv2" "github.com/cortexproject/cortex/pkg/storage/tsdb" ) @@ -176,24 +176,22 @@ func TestExemplar(t *testing.T) { now := time.Now() tsMillis := e2e.TimeToMilliseconds(now) - req := &cortexpbv2.WriteRequest{ - Symbols: []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, - Timeseries: []cortexpbv2.TimeSeries{ - { - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels - Metadata: cortexpbv2.Metadata{ - Type: cortexpbv2.METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type. - - HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. - UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. - }, - Samples: []cortexpbv2.Sample{{Value: 1, Timestamp: tsMillis}}, - Exemplars: []cortexpbv2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}}, + symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"} + timeseries := []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type. + + HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. + UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. }, + Samples: []writev2.Sample{{Value: 1, Timestamp: tsMillis}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}}, }, } - res, err := c.PushV2(req.Symbols, req.Timeseries) + res, err := c.PushV2(symbols, timeseries) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) testPushHeader(t, res.Header, "1", "0", "1") diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go index e28d51d4f23..eaace237ce3 100644 --- a/pkg/cortexpb/slicesPool.go +++ b/pkg/cortexpb/slicesPool.go @@ -13,7 +13,7 @@ type byteSlicePools struct { pools []sync.Pool } -func newSlicePool(pools int) *byteSlicePools { +func NewSlicePool(pools int) *byteSlicePools { sp := byteSlicePools{} sp.init(pools) return &sp @@ -32,7 +32,7 @@ func (sp *byteSlicePools) init(pools int) { } } -func (sp *byteSlicePools) getSlice(size int) *[]byte { +func (sp *byteSlicePools) GetSlice(size int) *[]byte { index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower if index >= len(sp.pools) { @@ -50,7 +50,7 @@ func (sp *byteSlicePools) getSlice(size int) *[]byte { return s } -func (sp *byteSlicePools) reuseSlice(s *[]byte) { +func (sp *byteSlicePools) ReuseSlice(s *[]byte) { index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower if index >= len(sp.pools) || index < 0 { diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go index 9bc56cdec3f..dd35beb33a0 100644 --- a/pkg/cortexpb/slicesPool_test.go +++ b/pkg/cortexpb/slicesPool_test.go @@ -9,22 +9,22 @@ import ( ) func TestFuzzyByteSlicePools(t *testing.T) { - sut := newSlicePool(20) + sut := NewSlicePool(20) maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1)) for i := 0; i < 1000; i++ { size := rand.Int() % maxByteSize - s := sut.getSlice(size) + s := sut.GetSlice(size) assert.Equal(t, len(*s), size) - sut.reuseSlice(s) + sut.ReuseSlice(s) } } func TestReturnSliceSmallerThanMin(t *testing.T) { - sut := newSlicePool(20) + sut := NewSlicePool(20) size := 3 buff := make([]byte, 0, size) - sut.reuseSlice(&buff) - buff2 := sut.getSlice(size * 2) + sut.ReuseSlice(&buff) + buff2 := sut.GetSlice(size * 2) assert.Equal(t, len(*buff2), size*2) } diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index db7354ffe45..b880739ae2b 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -47,7 +47,7 @@ var ( } }, } - bytePool = newSlicePool(20) + bytePool = NewSlicePool(20) ) // PreallocConfig configures how structures will be preallocated to optimise @@ -86,7 +86,7 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { size := p.Size() - p.data = bytePool.getSlice(size) + p.data = bytePool.GetSlice(size) dAtA = *p.data n, err := p.MarshalToSizedBuffer(dAtA[:size]) if err != nil { @@ -97,7 +97,7 @@ func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { func ReuseWriteRequest(req *PreallocWriteRequest) { if req.data != nil { - bytePool.reuseSlice(req.data) + bytePool.ReuseSlice(req.data) req.data = nil } req.Source = 0 diff --git a/pkg/cortexpbv2/compatv2.go b/pkg/cortexpbv2/compatv2.go index 1c02ccc4567..13d42d000fa 100644 --- a/pkg/cortexpbv2/compatv2.go +++ b/pkg/cortexpbv2/compatv2.go @@ -22,13 +22,14 @@ func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histo symbols := st.Symbols() req := &WriteRequest{ - Symbols: symbols, - Source: source, + Timeseries: PreallocTimeseriesV2SliceFromPool(), + Symbols: symbols, + Source: source, } i := 0 for i < len(samples) || i < len(histograms) || i < len(metadata) { - ts := TimeSeries{} + ts := TimeseriesV2FromPool() ts.LabelsRefs = labelRefs[i] if i < len(samples) { ts.Samples = append(ts.Samples, samples[i]) @@ -40,7 +41,7 @@ func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histo ts.Metadata = metadata[i] } i++ - req.Timeseries = append(req.Timeseries, ts) + req.Timeseries = append(req.Timeseries, PreallocTimeseriesV2{TimeSeries: ts}) } return req diff --git a/pkg/cortexpbv2/cortexv2.pb.go b/pkg/cortexpbv2/cortexv2.pb.go index 2bd5f4fc2ee..456977d1709 100644 --- a/pkg/cortexpbv2/cortexv2.pb.go +++ b/pkg/cortexpbv2/cortexv2.pb.go @@ -118,7 +118,7 @@ func (Histogram_ResetHint) EnumDescriptor() ([]byte, []int) { type WriteRequest struct { Source WriteRequest_SourceEnum `protobuf:"varint,3,opt,name=Source,proto3,enum=cortexpbv2.WriteRequest_SourceEnum" json:"Source,omitempty"` Symbols []string `protobuf:"bytes,4,rep,name=symbols,proto3" json:"symbols,omitempty"` - Timeseries []TimeSeries `protobuf:"bytes,5,rep,name=timeseries,proto3" json:"timeseries"` + Timeseries []PreallocTimeseriesV2 `protobuf:"bytes,5,rep,name=timeseries,proto3,customtype=PreallocTimeseriesV2" json:"timeseries"` SkipLabelNameValidation bool `protobuf:"varint,1000,opt,name=skip_label_name_validation,json=skipLabelNameValidation,proto3" json:"skip_label_name_validation,omitempty"` } @@ -168,13 +168,6 @@ func (m *WriteRequest) GetSymbols() []string { return nil } -func (m *WriteRequest) GetTimeseries() []TimeSeries { - if m != nil { - return m.Timeseries - } - return nil -} - func (m *WriteRequest) GetSkipLabelNameValidation() bool { if m != nil { return m.SkipLabelNameValidation @@ -902,75 +895,76 @@ func init() { func init() { proto.RegisterFile("cortexv2.proto", fileDescriptor_affad2b75b7d03df) } var fileDescriptor_affad2b75b7d03df = []byte{ - // 1077 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0xcd, 0x6e, 0x23, 0x45, - 0x17, 0xed, 0x72, 0x3b, 0xfe, 0xb9, 0x89, 0x3d, 0x9d, 0xfa, 0x32, 0x49, 0x4f, 0x3e, 0xe8, 0x18, - 0x8f, 0x00, 0x0b, 0xa4, 0x20, 0x79, 0x24, 0x84, 0x34, 0x11, 0x22, 0xc9, 0x74, 0x62, 0xa3, 0x89, - 0x13, 0x95, 0x3b, 0x83, 0xc2, 0xa6, 0xd5, 0xb1, 0xcb, 0x76, 0x6b, 0xfa, 0x8f, 0xae, 0x72, 0x34, - 0x61, 0xc5, 0x92, 0x25, 0x8f, 0xc0, 0x92, 0x97, 0x60, 0x3f, 0xcb, 0x6c, 0x90, 0x66, 0x03, 0x22, - 0xce, 0x66, 0x96, 0xf3, 0x08, 0xa8, 0xab, 0x7f, 0x93, 0x80, 0x60, 0x57, 0xf7, 0xdc, 0x73, 0xaa, - 0x4f, 0x9d, 0xaa, 0x6b, 0x19, 0x9a, 0x23, 0x3f, 0xe4, 0xf4, 0xd5, 0x45, 0x77, 0x3b, 0x08, 0x7d, - 0xee, 0x63, 0x88, 0xeb, 0xe0, 0xfc, 0xa2, 0xbb, 0xb9, 0x36, 0xf5, 0xa7, 0xbe, 0x80, 0x3f, 0x8b, - 0x56, 0x31, 0xa3, 0xfd, 0x63, 0x09, 0x56, 0xbe, 0x09, 0x6d, 0x4e, 0x09, 0xfd, 0x6e, 0x4e, 0x19, - 0xc7, 0x4f, 0xa1, 0x32, 0xf4, 0xe7, 0xe1, 0x88, 0xaa, 0x72, 0x0b, 0x75, 0x9a, 0xdd, 0xc7, 0xdb, - 0xf9, 0x1e, 0xdb, 0x45, 0xe6, 0x76, 0x4c, 0xd3, 0xbd, 0xb9, 0x4b, 0x12, 0x09, 0x56, 0xa1, 0xca, - 0x2e, 0xdd, 0x73, 0xdf, 0x61, 0x6a, 0xb9, 0x25, 0x77, 0xea, 0x24, 0x2d, 0xf1, 0x0e, 0x00, 0xb7, - 0x5d, 0xca, 0x68, 0x68, 0x53, 0xa6, 0x2e, 0xb5, 0xe4, 0xce, 0x72, 0x77, 0xbd, 0xb8, 0xb5, 0x61, - 0xbb, 0x74, 0x28, 0xba, 0x7b, 0xe5, 0xd7, 0x7f, 0x6c, 0x49, 0xa4, 0xc0, 0xc7, 0x3b, 0xb0, 0xc9, - 0x5e, 0xda, 0x81, 0xe9, 0x58, 0xe7, 0xd4, 0x31, 0x3d, 0xcb, 0xa5, 0xe6, 0x85, 0xe5, 0xd8, 0x63, - 0x8b, 0xdb, 0xbe, 0xa7, 0xbe, 0xad, 0xb6, 0x50, 0xa7, 0x46, 0x36, 0x22, 0xca, 0xf3, 0x88, 0x31, - 0xb0, 0x5c, 0xfa, 0x22, 0xeb, 0xb7, 0xb7, 0x00, 0x72, 0xaf, 0xb8, 0x0a, 0xf2, 0xee, 0x49, 0x5f, - 0x91, 0x70, 0x0d, 0xca, 0xe4, 0xf4, 0xb9, 0xae, 0xa0, 0xaf, 0xcb, 0x35, 0xa4, 0xc8, 0xed, 0x29, - 0x34, 0x92, 0xf3, 0xb1, 0xc0, 0xf7, 0x98, 0x38, 0xcd, 0xd0, 0x72, 0x03, 0x87, 0x32, 0x15, 0xb5, - 0x50, 0x47, 0x26, 0x69, 0x89, 0x35, 0x80, 0x9e, 0xcd, 0xb8, 0x3f, 0x0d, 0x2d, 0x97, 0xa9, 0x25, - 0xd1, 0x2c, 0x20, 0xf8, 0x3d, 0xa8, 0xeb, 0xaf, 0xa8, 0x1b, 0x38, 0x56, 0xc8, 0x44, 0x8e, 0x32, - 0xc9, 0x81, 0xf6, 0xaf, 0x25, 0x80, 0xfc, 0xb8, 0x78, 0x0b, 0x96, 0xc5, 0xb9, 0x98, 0x19, 0xd2, - 0x49, 0xf4, 0x29, 0xb9, 0xd3, 0x20, 0x10, 0x43, 0x84, 0x4e, 0x18, 0xee, 0x42, 0x95, 0x25, 0x3e, - 0x4a, 0x22, 0x38, 0x5c, 0x0c, 0x2e, 0xf6, 0x94, 0x84, 0x96, 0x12, 0xf1, 0x53, 0x80, 0x59, 0xee, - 0x50, 0x16, 0xb2, 0x87, 0x45, 0x59, 0xe6, 0x36, 0x8d, 0x3b, 0xa7, 0xe3, 0x2f, 0xa0, 0x4e, 0x33, - 0xfb, 0x65, 0xa1, 0x5d, 0x2b, 0x6a, 0xd3, 0xa3, 0x24, 0xd2, 0x9c, 0x8c, 0x3f, 0x87, 0x9a, 0x4b, - 0xb9, 0x35, 0xb6, 0xb8, 0xa5, 0x2e, 0xb5, 0xd0, 0x5d, 0xe1, 0x51, 0xd2, 0x4b, 0x84, 0x19, 0x17, - 0x7f, 0x0a, 0xab, 0xa3, 0x90, 0x5a, 0x9c, 0x8e, 0x4d, 0x71, 0xed, 0xdc, 0x72, 0x03, 0xb5, 0x22, - 0x82, 0x53, 0x92, 0x86, 0x91, 0xe2, 0x6d, 0x13, 0x6a, 0xa9, 0x83, 0x7f, 0x0f, 0x6f, 0x0d, 0x96, - 0x2e, 0x2c, 0x67, 0x4e, 0xc5, 0x2d, 0x21, 0x12, 0x17, 0xd1, 0x05, 0xe5, 0xdf, 0x49, 0x2e, 0x28, - 0x03, 0xda, 0x3b, 0x50, 0x89, 0x53, 0xcd, 0xd5, 0xe8, 0x1f, 0xd5, 0xa5, 0xbb, 0xea, 0xdf, 0x4a, - 0x50, 0x4b, 0x0f, 0x8a, 0x9f, 0x40, 0x99, 0x5f, 0x06, 0xb1, 0xbe, 0xd9, 0xdd, 0xfa, 0xbb, 0x30, - 0xa2, 0x45, 0x68, 0x8f, 0x8c, 0xcb, 0x80, 0x12, 0x41, 0xc6, 0x8f, 0xa0, 0x36, 0xa3, 0x4e, 0x10, - 0x1d, 0x49, 0x98, 0x6b, 0x90, 0x6a, 0x54, 0x13, 0x3a, 0x89, 0x5a, 0x73, 0xcf, 0xe6, 0xa2, 0x55, - 0x8e, 0x5b, 0x51, 0x4d, 0xe8, 0xa4, 0xfd, 0x3b, 0x02, 0xc8, 0xb7, 0xc2, 0xff, 0x87, 0x8d, 0x23, - 0xdd, 0x20, 0xfd, 0x7d, 0xd3, 0x38, 0x3b, 0xd1, 0xcd, 0xd3, 0xc1, 0xf0, 0x44, 0xdf, 0xef, 0x1f, - 0xf4, 0xf5, 0x67, 0x8a, 0x84, 0x37, 0xe0, 0x7f, 0xc5, 0xe6, 0xfe, 0xf1, 0xe9, 0xc0, 0xd0, 0x89, - 0x82, 0xf0, 0x43, 0x58, 0x2d, 0x36, 0x0e, 0x77, 0x4f, 0x0f, 0x75, 0xa5, 0x84, 0x1f, 0xc1, 0xc3, - 0x22, 0xdc, 0xeb, 0x0f, 0x8d, 0xe3, 0x43, 0xb2, 0x7b, 0xa4, 0xc8, 0x58, 0x83, 0xcd, 0x7b, 0x8a, - 0xbc, 0x5f, 0xbe, 0xfb, 0xa9, 0xe1, 0xe9, 0xd1, 0xd1, 0x2e, 0x39, 0x53, 0x96, 0xf0, 0x1a, 0x28, - 0xc5, 0x46, 0x7f, 0x70, 0x70, 0xac, 0x54, 0xb0, 0x0a, 0x6b, 0xb7, 0xe8, 0xc6, 0xae, 0xa1, 0x0f, - 0x75, 0x43, 0xa9, 0xb6, 0x7f, 0xae, 0x40, 0x3d, 0x7b, 0xb5, 0xf8, 0x7d, 0xa8, 0x8f, 0xfc, 0xb9, - 0xc7, 0x4d, 0xdb, 0xe3, 0x22, 0xdd, 0x72, 0x4f, 0x22, 0x35, 0x01, 0xf5, 0x3d, 0x8e, 0x3f, 0x80, - 0xe5, 0xb8, 0x3d, 0x71, 0x7c, 0x8b, 0xc7, 0x97, 0xdf, 0x93, 0x08, 0x08, 0xf0, 0x20, 0xc2, 0xb0, - 0x02, 0x32, 0x9b, 0xbb, 0x22, 0x60, 0x44, 0xa2, 0x25, 0x5e, 0x87, 0x0a, 0x1b, 0xcd, 0xa8, 0x6b, - 0x89, 0x68, 0x57, 0x49, 0x52, 0xe1, 0x0f, 0xa1, 0xf9, 0x3d, 0x0d, 0x7d, 0x93, 0xcf, 0x42, 0xca, - 0x66, 0xbe, 0x33, 0x16, 0x6f, 0x1b, 0x91, 0x46, 0x84, 0x1a, 0x29, 0x88, 0x3f, 0x4a, 0x68, 0xb9, - 0xaf, 0x8a, 0xf0, 0x85, 0xc8, 0x4a, 0x84, 0xef, 0xa7, 0xde, 0x3e, 0x01, 0xa5, 0xc0, 0x8b, 0x0d, - 0x56, 0x85, 0x41, 0x44, 0x9a, 0x19, 0x33, 0x36, 0xb9, 0x0f, 0x4d, 0x8f, 0x4e, 0x2d, 0x6e, 0x5f, - 0x50, 0x93, 0x05, 0x96, 0xc7, 0xd4, 0xda, 0xfd, 0xdf, 0xce, 0xbd, 0xf9, 0xe8, 0x25, 0xe5, 0xc3, - 0xc0, 0xf2, 0x92, 0xc1, 0x6a, 0xa4, 0x9a, 0x08, 0x63, 0xf8, 0x63, 0x78, 0x90, 0x6d, 0x32, 0xa6, - 0x0e, 0xb7, 0x98, 0x5a, 0x6f, 0xc9, 0x1d, 0x4c, 0xb2, 0xbd, 0x9f, 0x09, 0xf4, 0x16, 0x51, 0xb8, - 0x63, 0x2a, 0xb4, 0xe4, 0x0e, 0xca, 0x89, 0xc2, 0x1a, 0x8b, 0x6c, 0x05, 0x3e, 0xb3, 0x0b, 0xb6, - 0x96, 0xff, 0x8b, 0xad, 0x54, 0x93, 0xd9, 0xca, 0x36, 0x49, 0x6c, 0xad, 0xc4, 0xb6, 0x52, 0x38, - 0xb7, 0x95, 0x11, 0x13, 0x5b, 0x8d, 0xd8, 0x56, 0x0a, 0x27, 0xb6, 0xbe, 0x04, 0x08, 0x29, 0xa3, - 0xdc, 0x9c, 0x45, 0xe9, 0x37, 0xef, 0xcf, 0x5c, 0xf6, 0x7e, 0xb6, 0x49, 0xc4, 0xeb, 0xd9, 0x1e, - 0x27, 0xf5, 0x30, 0x5d, 0xde, 0x1e, 0xec, 0x07, 0x77, 0x06, 0x1b, 0x3f, 0x86, 0xc6, 0x68, 0xce, - 0xb8, 0xef, 0x9a, 0xe2, 0x67, 0x80, 0xa9, 0x8a, 0x30, 0xb1, 0x12, 0x83, 0x2f, 0x04, 0xd6, 0x1e, - 0x43, 0x3d, 0xdb, 0x1a, 0x6f, 0xc2, 0x3a, 0x89, 0x5e, 0xaf, 0xd9, 0xeb, 0x0f, 0x8c, 0x3b, 0x23, - 0x88, 0xa1, 0x59, 0xe8, 0x9d, 0xe9, 0x43, 0x05, 0xe1, 0x55, 0x68, 0x14, 0xb0, 0xc1, 0xb1, 0x52, - 0x8a, 0xa6, 0xa4, 0x00, 0xc5, 0xf3, 0x28, 0xef, 0x55, 0x61, 0x49, 0x04, 0xb1, 0xb7, 0x02, 0x90, - 0xbf, 0xa5, 0xf6, 0x0e, 0x40, 0x1e, 0x7a, 0xf4, 0x9c, 0xfd, 0xc9, 0x84, 0xd1, 0x78, 0x3e, 0x56, - 0x49, 0x52, 0x45, 0xb8, 0x43, 0xbd, 0x29, 0x9f, 0x89, 0xb1, 0x68, 0x90, 0xa4, 0xda, 0xfb, 0xea, - 0xea, 0x5a, 0x93, 0xde, 0x5c, 0x6b, 0xd2, 0xbb, 0x6b, 0x0d, 0xfd, 0xb0, 0xd0, 0xd0, 0x2f, 0x0b, - 0x0d, 0xbd, 0x5e, 0x68, 0xe8, 0x6a, 0xa1, 0xa1, 0x3f, 0x17, 0x1a, 0x7a, 0xbb, 0xd0, 0xa4, 0x77, - 0x0b, 0x0d, 0xfd, 0x74, 0xa3, 0x49, 0x57, 0x37, 0x9a, 0xf4, 0xe6, 0x46, 0x93, 0xbe, 0x2d, 0xfc, - 0xc7, 0x38, 0xaf, 0x88, 0x3f, 0x15, 0x4f, 0xfe, 0x0a, 0x00, 0x00, 0xff, 0xff, 0xbc, 0xfd, 0xf0, - 0x76, 0x88, 0x08, 0x00, 0x00, + // 1094 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0x4d, 0x6f, 0x1b, 0x45, + 0x18, 0xf6, 0x78, 0x1d, 0x7f, 0xbc, 0x89, 0xdd, 0xcd, 0xe0, 0xb6, 0xdb, 0x50, 0x36, 0xc6, 0x15, + 0x60, 0x81, 0x14, 0x24, 0x57, 0x42, 0x48, 0xad, 0x10, 0x71, 0xba, 0x89, 0x8d, 0x1a, 0x27, 0x1a, + 0x6f, 0x82, 0xc2, 0x65, 0xb5, 0xb1, 0xc7, 0xf6, 0xaa, 0xfb, 0xc5, 0xce, 0x38, 0x6a, 0x38, 0xf1, + 0x13, 0xf8, 0x09, 0x3d, 0xf2, 0x27, 0xb8, 0xf7, 0x98, 0x0b, 0x52, 0x85, 0x44, 0x45, 0x9c, 0x4b, + 0x8f, 0xfd, 0x09, 0x68, 0x67, 0x3f, 0x93, 0x80, 0xe0, 0x36, 0xef, 0xf3, 0x3e, 0xcf, 0xec, 0x33, + 0xcf, 0xcc, 0x6b, 0x19, 0x1a, 0x63, 0x2f, 0xe0, 0xf4, 0xe5, 0x59, 0x77, 0xcb, 0x0f, 0x3c, 0xee, + 0x61, 0x88, 0x6a, 0xff, 0xf4, 0xac, 0xbb, 0xd1, 0x9c, 0x79, 0x33, 0x4f, 0xc0, 0x5f, 0x86, 0xab, + 0x88, 0xd1, 0x7e, 0x55, 0x84, 0xb5, 0xef, 0x03, 0x8b, 0x53, 0x42, 0x7f, 0x5c, 0x50, 0xc6, 0xf1, + 0x13, 0x28, 0x8f, 0xbc, 0x45, 0x30, 0xa6, 0x8a, 0xd4, 0x42, 0x9d, 0x46, 0xf7, 0xd1, 0x56, 0xb6, + 0xc7, 0x56, 0x9e, 0xb9, 0x15, 0xd1, 0x34, 0x77, 0xe1, 0x90, 0x58, 0x82, 0x15, 0xa8, 0xb0, 0x73, + 0xe7, 0xd4, 0xb3, 0x99, 0x52, 0x6a, 0x49, 0x9d, 0x1a, 0x49, 0x4a, 0xac, 0x03, 0x70, 0xcb, 0xa1, + 0x8c, 0x06, 0x16, 0x65, 0xca, 0x4a, 0x4b, 0xea, 0xac, 0x76, 0xef, 0xe5, 0xb7, 0xd6, 0x2d, 0x87, + 0x8e, 0x44, 0xb7, 0xf7, 0xf0, 0xf5, 0xdb, 0xcd, 0xc2, 0x1f, 0x6f, 0x37, 0x9b, 0x87, 0x01, 0x35, + 0x6d, 0xdb, 0x1b, 0xeb, 0xa9, 0xf2, 0xb8, 0x4b, 0x72, 0xfb, 0xe0, 0xa7, 0xb0, 0xc1, 0x5e, 0x58, + 0xbe, 0x61, 0x9b, 0xa7, 0xd4, 0x36, 0x5c, 0xd3, 0xa1, 0xc6, 0x99, 0x69, 0x5b, 0x13, 0x93, 0x5b, + 0x9e, 0xab, 0xbc, 0xab, 0xb4, 0x50, 0xa7, 0x4a, 0xee, 0x87, 0x94, 0xe7, 0x21, 0x63, 0x68, 0x3a, + 0xf4, 0x38, 0xed, 0xb7, 0x37, 0x01, 0xb2, 0x33, 0xe0, 0x0a, 0x48, 0xdb, 0x87, 0x03, 0xb9, 0x80, + 0xab, 0x50, 0x22, 0x47, 0xcf, 0x35, 0x19, 0x7d, 0x57, 0xaa, 0x22, 0x59, 0x6a, 0xcf, 0xa0, 0x1e, + 0x9f, 0x9b, 0xf9, 0x9e, 0xcb, 0xc4, 0x29, 0x47, 0xa6, 0xe3, 0xdb, 0x94, 0x29, 0xa8, 0x85, 0x3a, + 0x12, 0x49, 0x4a, 0xac, 0x02, 0xf4, 0x2d, 0xc6, 0xbd, 0x59, 0x60, 0x3a, 0x4c, 0x29, 0x8a, 0x66, + 0x0e, 0xc1, 0x0f, 0xa1, 0xa6, 0xbd, 0xa4, 0x8e, 0x6f, 0x9b, 0x01, 0x13, 0xf9, 0x4a, 0x24, 0x03, + 0xda, 0xbf, 0x15, 0x01, 0xb2, 0x18, 0xf0, 0x26, 0xac, 0x8a, 0x73, 0x31, 0x23, 0xa0, 0xd3, 0xf0, + 0x53, 0x52, 0xa7, 0x4e, 0x20, 0x82, 0x08, 0x9d, 0x32, 0xdc, 0x85, 0x0a, 0x8b, 0x7d, 0x14, 0x45, + 0xa0, 0x38, 0x1f, 0x68, 0xe4, 0xa9, 0x57, 0x0a, 0xc3, 0x24, 0x09, 0x11, 0x3f, 0x01, 0x98, 0x67, + 0x0e, 0x25, 0x21, 0xbb, 0x9b, 0x97, 0xa5, 0x6e, 0x63, 0x65, 0x8e, 0x8e, 0xbf, 0x86, 0x1a, 0x4d, + 0xed, 0x97, 0x84, 0xb6, 0x99, 0xd7, 0x26, 0x47, 0x89, 0xa5, 0x19, 0x19, 0x7f, 0x05, 0x55, 0x87, + 0x72, 0x73, 0x62, 0x72, 0x53, 0x59, 0x69, 0xa1, 0x9b, 0xc2, 0xfd, 0xb8, 0x17, 0x0b, 0x53, 0x2e, + 0xfe, 0x02, 0xd6, 0xc7, 0x01, 0x35, 0x39, 0x9d, 0x18, 0xe2, 0xda, 0xb9, 0xe9, 0xf8, 0x4a, 0x59, + 0x04, 0x27, 0xc7, 0x0d, 0x3d, 0xc1, 0xdb, 0x06, 0x54, 0x13, 0x07, 0xff, 0x1d, 0x5e, 0x13, 0x56, + 0xce, 0x4c, 0x7b, 0x41, 0xc5, 0x2d, 0x21, 0x12, 0x15, 0xe1, 0x05, 0x65, 0xdf, 0x89, 0x2f, 0x28, + 0x05, 0xda, 0x4f, 0xa1, 0x1c, 0xa5, 0x9a, 0xa9, 0xd1, 0xbf, 0xaa, 0x8b, 0x37, 0xd5, 0xbf, 0x17, + 0xa1, 0x9a, 0x1c, 0x14, 0x3f, 0x86, 0x12, 0x3f, 0xf7, 0x23, 0x7d, 0xa3, 0xbb, 0xf9, 0x4f, 0x61, + 0x84, 0x8b, 0xc0, 0x1a, 0xeb, 0xe7, 0x3e, 0x25, 0x82, 0x8c, 0x1f, 0x40, 0x75, 0x4e, 0x6d, 0x3f, + 0x3c, 0x92, 0x30, 0x57, 0x27, 0x95, 0xb0, 0x26, 0x74, 0x1a, 0xb6, 0x16, 0xae, 0xc5, 0x45, 0xab, + 0x14, 0xb5, 0xc2, 0x9a, 0xd0, 0x69, 0xfb, 0x4f, 0x04, 0x90, 0x6d, 0x85, 0x3f, 0x84, 0xfb, 0xfb, + 0x9a, 0x4e, 0x06, 0x3b, 0x86, 0x7e, 0x72, 0xa8, 0x19, 0x47, 0xc3, 0xd1, 0xa1, 0xb6, 0x33, 0xd8, + 0x1d, 0x68, 0xcf, 0xe4, 0x02, 0xbe, 0x0f, 0x1f, 0xe4, 0x9b, 0x3b, 0x07, 0x47, 0x43, 0x5d, 0x23, + 0x32, 0xc2, 0x77, 0x61, 0x3d, 0xdf, 0xd8, 0xdb, 0x3e, 0xda, 0xd3, 0xe4, 0x22, 0x7e, 0x00, 0x77, + 0xf3, 0x70, 0x7f, 0x30, 0xd2, 0x0f, 0xf6, 0xc8, 0xf6, 0xbe, 0x2c, 0x61, 0x15, 0x36, 0x6e, 0x29, + 0xb2, 0x7e, 0xe9, 0xe6, 0xa7, 0x46, 0x47, 0xfb, 0xfb, 0xdb, 0xe4, 0x44, 0x5e, 0xc1, 0x4d, 0x90, + 0xf3, 0x8d, 0xc1, 0x70, 0xf7, 0x40, 0x2e, 0x63, 0x05, 0x9a, 0xd7, 0xe8, 0xfa, 0xb6, 0xae, 0x8d, + 0x34, 0x5d, 0xae, 0xb4, 0x5f, 0x95, 0xa1, 0x96, 0xbe, 0x5a, 0xfc, 0x11, 0xd4, 0xc6, 0xde, 0xc2, + 0xe5, 0x86, 0xe5, 0x72, 0x91, 0x6e, 0xa9, 0x5f, 0x20, 0x55, 0x01, 0x0d, 0x5c, 0x8e, 0x3f, 0x86, + 0xd5, 0xa8, 0x3d, 0xb5, 0x3d, 0x93, 0x47, 0x97, 0xdf, 0x2f, 0x10, 0x10, 0xe0, 0x6e, 0x88, 0x61, + 0x19, 0x24, 0xb6, 0x70, 0x44, 0xc0, 0x88, 0x84, 0x4b, 0x7c, 0x0f, 0xca, 0x6c, 0x3c, 0xa7, 0x8e, + 0x29, 0xa2, 0x5d, 0x27, 0x71, 0x85, 0x3f, 0x81, 0xc6, 0x4f, 0x34, 0xf0, 0x0c, 0x3e, 0x0f, 0x28, + 0x9b, 0x7b, 0xf6, 0x44, 0xbc, 0x6d, 0x44, 0xea, 0x21, 0xaa, 0x27, 0x20, 0xfe, 0x34, 0xa6, 0x65, + 0xbe, 0xca, 0xc2, 0x17, 0x22, 0x6b, 0x21, 0xbe, 0x93, 0x78, 0xfb, 0x1c, 0xe4, 0x1c, 0x2f, 0x32, + 0x58, 0x11, 0x06, 0x11, 0x69, 0xa4, 0xcc, 0xc8, 0xe4, 0x0e, 0x34, 0x5c, 0x3a, 0x33, 0xb9, 0x75, + 0x46, 0x0d, 0xe6, 0x9b, 0x2e, 0x53, 0xaa, 0xb7, 0x7f, 0x53, 0x7b, 0x8b, 0xf1, 0x0b, 0xca, 0x47, + 0xbe, 0xe9, 0xc6, 0x83, 0x55, 0x4f, 0x34, 0x21, 0xc6, 0xf0, 0x67, 0x70, 0x27, 0xdd, 0x64, 0x42, + 0x6d, 0x6e, 0x32, 0xa5, 0xd6, 0x92, 0x3a, 0x98, 0xa4, 0x7b, 0x3f, 0x13, 0xe8, 0x35, 0xa2, 0x70, + 0xc7, 0x14, 0x68, 0x49, 0x1d, 0x94, 0x11, 0x85, 0x35, 0x16, 0xda, 0xf2, 0x3d, 0x66, 0xe5, 0x6c, + 0xad, 0xfe, 0x1f, 0x5b, 0x89, 0x26, 0xb5, 0x95, 0x6e, 0x12, 0xdb, 0x5a, 0x8b, 0x6c, 0x25, 0x70, + 0x66, 0x2b, 0x25, 0xc6, 0xb6, 0xea, 0x91, 0xad, 0x04, 0x8e, 0x6d, 0x7d, 0x03, 0x10, 0x50, 0x46, + 0xb9, 0x31, 0x0f, 0xd3, 0x6f, 0xdc, 0x9e, 0xb9, 0xf4, 0xfd, 0x6c, 0x91, 0x90, 0xd7, 0xb7, 0x5c, + 0x4e, 0x6a, 0x41, 0xb2, 0xbc, 0x3e, 0xd8, 0x77, 0x6e, 0x0c, 0x36, 0x7e, 0x04, 0xf5, 0xf1, 0x82, + 0x71, 0xcf, 0x31, 0xc4, 0xcf, 0x00, 0x53, 0x64, 0x61, 0x62, 0x2d, 0x02, 0x8f, 0x05, 0xd6, 0x9e, + 0x40, 0x2d, 0xdd, 0x1a, 0x6f, 0xc0, 0x3d, 0x12, 0xbe, 0x5e, 0xa3, 0x3f, 0x18, 0xea, 0x37, 0x46, + 0x10, 0x43, 0x23, 0xd7, 0x3b, 0xd1, 0x46, 0x32, 0xc2, 0xeb, 0x50, 0xcf, 0x61, 0xc3, 0x03, 0xb9, + 0x18, 0x4e, 0x49, 0x0e, 0x8a, 0xe6, 0x51, 0xea, 0x55, 0x60, 0x45, 0x04, 0xd1, 0x5b, 0x03, 0xc8, + 0xde, 0x52, 0xfb, 0x29, 0x40, 0x16, 0x7a, 0xf8, 0x9c, 0xbd, 0xe9, 0x94, 0xd1, 0x68, 0x3e, 0xd6, + 0x49, 0x5c, 0x85, 0xb8, 0x4d, 0xdd, 0x19, 0x9f, 0x8b, 0xb1, 0xa8, 0x93, 0xb8, 0xea, 0x7d, 0x7b, + 0x71, 0xa9, 0x16, 0xde, 0x5c, 0xaa, 0x85, 0xf7, 0x97, 0x2a, 0xfa, 0x79, 0xa9, 0xa2, 0x5f, 0x97, + 0x2a, 0x7a, 0xbd, 0x54, 0xd1, 0xc5, 0x52, 0x45, 0x7f, 0x2d, 0x55, 0xf4, 0x6e, 0xa9, 0x16, 0xde, + 0x2f, 0x55, 0xf4, 0xcb, 0x95, 0x5a, 0xb8, 0xb8, 0x52, 0x0b, 0x6f, 0xae, 0xd4, 0xc2, 0x0f, 0xb9, + 0xff, 0x1e, 0xa7, 0x65, 0xf1, 0x67, 0xe3, 0xf1, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x03, 0x9f, + 0x7c, 0x56, 0xa0, 0x08, 0x00, 0x00, } func (x WriteRequest_SourceEnum) String() string { @@ -1028,7 +1022,7 @@ func (this *WriteRequest) Equal(that interface{}) bool { return false } for i := range this.Timeseries { - if !this.Timeseries[i].Equal(&that1.Timeseries[i]) { + if !this.Timeseries[i].Equal(that1.Timeseries[i]) { return false } } @@ -1459,13 +1453,7 @@ func (this *WriteRequest) GoString() string { s = append(s, "&cortexpbv2.WriteRequest{") s = append(s, "Source: "+fmt.Sprintf("%#v", this.Source)+",\n") s = append(s, "Symbols: "+fmt.Sprintf("%#v", this.Symbols)+",\n") - if this.Timeseries != nil { - vs := make([]*TimeSeries, len(this.Timeseries)) - for i := range vs { - vs[i] = &this.Timeseries[i] - } - s = append(s, "Timeseries: "+fmt.Sprintf("%#v", vs)+",\n") - } + s = append(s, "Timeseries: "+fmt.Sprintf("%#v", this.Timeseries)+",\n") s = append(s, "SkipLabelNameValidation: "+fmt.Sprintf("%#v", this.SkipLabelNameValidation)+",\n") s = append(s, "}") return strings.Join(s, "") @@ -1675,11 +1663,11 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { if len(m.Timeseries) > 0 { for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { + size := m.Timeseries[iNdEx].Size() + i -= size + if _, err := m.Timeseries[iNdEx].MarshalTo(dAtA[i:]); err != nil { return 0, err } - i -= size i = encodeVarintCortexv2(dAtA, i, uint64(size)) } i-- @@ -2484,15 +2472,10 @@ func (this *WriteRequest) String() string { if this == nil { return "nil" } - repeatedStringForTimeseries := "[]TimeSeries{" - for _, f := range this.Timeseries { - repeatedStringForTimeseries += strings.Replace(strings.Replace(f.String(), "TimeSeries", "TimeSeries", 1), `&`, ``, 1) + "," - } - repeatedStringForTimeseries += "}" s := strings.Join([]string{`&WriteRequest{`, `Source:` + fmt.Sprintf("%v", this.Source) + `,`, `Symbols:` + fmt.Sprintf("%v", this.Symbols) + `,`, - `Timeseries:` + repeatedStringForTimeseries + `,`, + `Timeseries:` + fmt.Sprintf("%v", this.Timeseries) + `,`, `SkipLabelNameValidation:` + fmt.Sprintf("%v", this.SkipLabelNameValidation) + `,`, `}`, }, "") @@ -2776,7 +2759,7 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Timeseries = append(m.Timeseries, TimeSeries{}) + m.Timeseries = append(m.Timeseries, PreallocTimeseriesV2{}) if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/pkg/cortexpbv2/cortexv2.proto b/pkg/cortexpbv2/cortexv2.proto index e0c3ae9b1be..c56038b9f00 100644 --- a/pkg/cortexpbv2/cortexv2.proto +++ b/pkg/cortexpbv2/cortexv2.proto @@ -18,7 +18,7 @@ message WriteRequest { } SourceEnum Source = 3; repeated string symbols = 4; - repeated TimeSeries timeseries = 5 [(gogoproto.nullable) = false]; + repeated TimeSeries timeseries = 5 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocTimeseriesV2"]; bool skip_label_name_validation = 1000; // set intentionally high to keep WriteRequest compatible with upstream Prometheus } diff --git a/pkg/cortexpbv2/timeseriesv2.go b/pkg/cortexpbv2/timeseriesv2.go new file mode 100644 index 00000000000..684e4bfac0f --- /dev/null +++ b/pkg/cortexpbv2/timeseriesv2.go @@ -0,0 +1,136 @@ +package cortexpbv2 + +import ( + "sync" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +var ( + expectedTimeseries = 100 + expectedLabels = 20 + expectedSymbols = 20 + expectedSamplesPerSeries = 10 + expectedExemplarsPerSeries = 1 + expectedHistogramsPerSeries = 1 + + slicePool = sync.Pool{ + New: func() interface{} { + return make([]PreallocTimeseriesV2, 0, expectedTimeseries) + }, + } + + timeSeriesPool = sync.Pool{ + New: func() interface{} { + return &TimeSeries{ + LabelsRefs: make([]uint32, 0, expectedLabels), + Samples: make([]Sample, 0, expectedSamplesPerSeries), + Histograms: make([]Histogram, 0, expectedHistogramsPerSeries), + Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries), + Metadata: Metadata{}, + } + }, + } + + writeRequestPool = sync.Pool{ + New: func() interface{} { + return &PreallocWriteRequestV2{ + WriteRequest: WriteRequest{ + Symbols: make([]string, 0, expectedSymbols), + }, + } + }, + } + bytePool = cortexpb.NewSlicePool(20) +) + +// PreallocWriteRequestV2 is a WriteRequest which preallocs slices on Unmarshal. +type PreallocWriteRequestV2 struct { + WriteRequest + data *[]byte +} + +// Unmarshal implements proto.Message. +func (p *PreallocWriteRequestV2) Unmarshal(dAtA []byte) error { + p.Timeseries = PreallocTimeseriesV2SliceFromPool() + return p.WriteRequest.Unmarshal(dAtA) +} + +func (p *PreallocWriteRequestV2) Marshal() (dAtA []byte, err error) { + size := p.Size() + p.data = bytePool.GetSlice(size) + dAtA = *p.data + n, err := p.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +// PreallocTimeseriesV2 is a TimeSeries which preallocs slices on Unmarshal. +type PreallocTimeseriesV2 struct { + *TimeSeries +} + +// Unmarshal implements proto.Message. +func (p *PreallocTimeseriesV2) Unmarshal(dAtA []byte) error { + p.TimeSeries = TimeseriesV2FromPool() + return p.TimeSeries.Unmarshal(dAtA) +} + +func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { + if req.data != nil { + bytePool.ReuseSlice(req.data) + req.data = nil + } + req.Source = 0 + req.Symbols = nil + req.Timeseries = nil + writeRequestPool.Put(req) +} + +func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 { + return writeRequestPool.Get().(*PreallocWriteRequestV2) +} + +// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool. +// ReuseSlice should be called once done. +func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 { + return slicePool.Get().([]PreallocTimeseriesV2) +} + +// ReuseSlice puts the slice back into a sync.Pool for reuse. +func ReuseSlice(ts []PreallocTimeseriesV2) { + for i := range ts { + ReuseTimeseries(ts[i].TimeSeries) + } + + slicePool.Put(ts[:0]) //nolint:staticcheck //see comment on slicePool for more details +} + +// TimeseriesV2FromPool retrieves a pointer to a TimeSeries from a sync.Pool. +// ReuseTimeseries should be called once done, unless ReuseSlice was called on the slice that contains this TimeSeries. +func TimeseriesV2FromPool() *TimeSeries { + return timeSeriesPool.Get().(*TimeSeries) +} + +// ReuseTimeseries puts the timeseries back into a sync.Pool for reuse. +func ReuseTimeseries(ts *TimeSeries) { + // clear ts lableRef and samples + ts.LabelsRefs = ts.LabelsRefs[:0] + ts.Samples = ts.Samples[:0] + + // clear exmplar labelrefs + for i := range ts.Exemplars { + ts.Exemplars[i].LabelsRefs = ts.Exemplars[i].LabelsRefs[:0] + } + + for i := range ts.Histograms { + ts.Histograms[i].Reset() + } + + ts.Exemplars = ts.Exemplars[:0] + ts.Histograms = ts.Histograms[:0] + ts.Metadata = Metadata{} + timeSeriesPool.Put(ts) +} diff --git a/pkg/cortexpbv2/timeseriesv2_test.go b/pkg/cortexpbv2/timeseriesv2_test.go new file mode 100644 index 00000000000..d10527564e8 --- /dev/null +++ b/pkg/cortexpbv2/timeseriesv2_test.go @@ -0,0 +1,111 @@ +package cortexpbv2 + +import ( + "fmt" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPreallocTimeseriesV2SliceFromPool(t *testing.T) { + t.Run("new instance is provided when not available to reuse", func(t *testing.T) { + first := PreallocTimeseriesV2SliceFromPool() + second := PreallocTimeseriesV2SliceFromPool() + + assert.NotSame(t, first, second) + }) + + t.Run("instance is cleaned before reusing", func(t *testing.T) { + slice := PreallocTimeseriesV2SliceFromPool() + slice = append(slice, PreallocTimeseriesV2{TimeSeries: &TimeSeries{}}) + ReuseSlice(slice) + + reused := PreallocTimeseriesV2SliceFromPool() + assert.Len(t, reused, 0) + }) +} + +func TestTimeseriesV2FromPool(t *testing.T) { + t.Run("new instance is provided when not available to reuse", func(t *testing.T) { + first := TimeseriesV2FromPool() + second := TimeseriesV2FromPool() + + assert.NotSame(t, first, second) + }) + + t.Run("instance is cleaned before reusing", func(t *testing.T) { + ts := TimeseriesV2FromPool() + ts.LabelsRefs = []uint32{1, 2} + ts.Samples = []Sample{{Value: 1, Timestamp: 2}} + ts.Exemplars = []Exemplar{{LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 2}} + ts.Histograms = []Histogram{{}} + fmt.Println("ts.Histograms", len(ts.Histograms)) + ReuseTimeseries(ts) + + reused := TimeseriesV2FromPool() + assert.Len(t, reused.LabelsRefs, 0) + assert.Len(t, reused.Samples, 0) + assert.Len(t, reused.Exemplars, 0) + assert.Len(t, reused.Histograms, 0) + }) +} + +func BenchmarkMarshallWriteRequest(b *testing.B) { + ts := PreallocTimeseriesV2SliceFromPool() + + for i := 0; i < 100; i++ { + ts = append(ts, PreallocTimeseriesV2{TimeSeries: TimeseriesV2FromPool()}) + ts[i].LabelsRefs = []uint32{1, 2, 3, 4, 5, 6, 7, 8} + ts[i].Samples = []Sample{{Value: 1, Timestamp: 2}} + } + + tests := []struct { + name string + writeRequestFactory func() proto.Marshaler + clean func(in interface{}) + }{ + { + name: "no-pool", + writeRequestFactory: func() proto.Marshaler { + return &WriteRequest{Timeseries: ts} + }, + clean: func(in interface{}) {}, + }, + { + name: "byte pool", + writeRequestFactory: func() proto.Marshaler { + w := &PreallocWriteRequestV2{} + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequestV2(in.(*PreallocWriteRequestV2)) + }, + }, + { + name: "byte and write pool", + writeRequestFactory: func() proto.Marshaler { + w := PreallocWriteRequestV2FromPool() + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequestV2(in.(*PreallocWriteRequestV2)) + }, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + w := tc.writeRequestFactory() + _, err := w.Marshal() + require.NoError(b, err) + tc.clean(w) + } + b.ReportAllocs() + }) + } +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7c798956f99..9684542c457 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -46,8 +46,8 @@ import ( ) var ( - emptyV2Timeseries = cortexpbv2.TimeSeries{} - emptyPreallocSeries = cortexpb.PreallocTimeseries{} + emptyPreallocSeriesV2 = cortexpbv2.PreallocTimeseriesV2{} + emptyPreallocSeries = cortexpb.PreallocTimeseries{} supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} @@ -631,12 +631,12 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri nil } -func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.WriteRequest, userID string, limits *validation.Limits, b labels.ScratchBuilder, removeReplica bool) ([]uint32, []cortexpbv2.TimeSeries, int64, int64, int64, int64, error, error) { +func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.WriteRequest, userID string, limits *validation.Limits, b labels.ScratchBuilder, removeReplica bool) ([]uint32, []cortexpbv2.PreallocTimeseriesV2, int64, int64, int64, int64, error, error) { pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeysV2") defer pSpan.Finish() // For each timeseries or samples, we compute a hash to distribute across ingesters; // check each sample/metadata and discard if outside limits. - validatedTimeseries := make([]cortexpbv2.TimeSeries, 0, len(req.Timeseries)) + validatedTimeseries := make([]cortexpbv2.PreallocTimeseriesV2, 0, len(req.Timeseries)) seriesKeys := make([]uint32, 0, len(req.Timeseries)) validatedFloatSamples := 0 validatedHistogramSamples := 0 @@ -743,8 +743,8 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W validatedMetadata++ } - // validateSeriesV2 would have returned an emptyV2Timeseries if there were no valid samples. - if validatedSeries.Equal(emptyV2Timeseries) { + // validateSeriesV2 would have returned an emptyPreallocSeriesV2 if there were no valid samples. + if validatedSeries == emptyPreallocSeriesV2 { continue } @@ -758,7 +758,7 @@ func (d *Distributor) prepareSeriesKeysV2(ctx context.Context, req *cortexpbv2.W return seriesKeys, validatedTimeseries, int64(validatedMetadata), int64(validatedFloatSamples), int64(validatedHistogramSamples), int64(validatedExemplars), firstPartialErr, nil } -func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteRequest, subRing ring.ReadRing, keys []uint32, validatedTimeseries []cortexpbv2.TimeSeries, userID string, stats *WriteStats) error { +func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteRequest, subRing ring.ReadRing, keys []uint32, validatedTimeseries []cortexpbv2.PreallocTimeseriesV2, userID string, stats *WriteStats) error { span, _ := opentracing.StartSpanFromContext(ctx, "doBatchV2") defer span.Finish() @@ -782,7 +782,7 @@ func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteReques } return ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error { - timeseries := make([]cortexpbv2.TimeSeries, 0, len(indexes)) + timeseries := make([]cortexpbv2.PreallocTimeseriesV2, 0, len(indexes)) for _, i := range indexes { timeseries = append(timeseries, validatedTimeseries[i]) @@ -790,11 +790,12 @@ func (d *Distributor) doBatchV2(ctx context.Context, req *cortexpbv2.WriteReques return d.sendV2(localCtx, req.Symbols, ingester, timeseries, req.Source, stats) }, func() { + cortexpbv2.ReuseSlice(req.Timeseries) cancel() }) } -func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester ring.InstanceDesc, timeseries []cortexpbv2.TimeSeries, source cortexpbv2.WriteRequest_SourceEnum, stats *WriteStats) error { +func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester ring.InstanceDesc, timeseries []cortexpbv2.PreallocTimeseriesV2, source cortexpbv2.WriteRequest_SourceEnum, stats *WriteStats) error { h, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return err @@ -807,12 +808,15 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin c := h.(ingester_client.HealthAndIngesterClient) - req := cortexpbv2.WriteRequest{} + req := cortexpbv2.PreallocWriteRequestV2FromPool() req.Symbols = symbols req.Timeseries = timeseries req.Source = source - resp, err := c.PushV2(ctx, &req) + resp, err := c.PushPreAllocV2(ctx, req) + if err == nil { + cortexpbv2.ReuseWriteRequestV2(req) + } if len(timeseries) > 0 { d.ingesterAppends.WithLabelValues(id, typeSamples).Inc() @@ -849,11 +853,11 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin // any are configured to be dropped for the user ID. // Returns the validated series with it's labels/samples, and any error. // The returned error may retain the series labels. -func (d *Distributor) validateSeriesV2(ts cortexpbv2.TimeSeries, seriesLabels []cortexpb.LabelAdapter, symbols []string, userID string, skipLabelNameValidation bool, limits *validation.Limits) (cortexpbv2.TimeSeries, validation.ValidationError) { +func (d *Distributor) validateSeriesV2(ts cortexpbv2.PreallocTimeseriesV2, seriesLabels []cortexpb.LabelAdapter, symbols []string, userID string, skipLabelNameValidation bool, limits *validation.Limits) (cortexpbv2.PreallocTimeseriesV2, validation.ValidationError) { d.labelsHistogram.Observe(float64(len(ts.LabelsRefs))) if err := validation.ValidateLabels(d.validateMetrics, limits, userID, seriesLabels, skipLabelNameValidation); err != nil { - return emptyV2Timeseries, err + return emptyPreallocSeriesV2, err } var samples []cortexpbv2.Sample @@ -862,7 +866,7 @@ func (d *Distributor) validateSeriesV2(ts cortexpbv2.TimeSeries, seriesLabels [] samples = make([]cortexpbv2.Sample, 0, len(ts.Samples)) for _, s := range ts.Samples { if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, seriesLabels, s.Timestamp); err != nil { - return emptyV2Timeseries, err + return emptyPreallocSeriesV2, err } samples = append(samples, s) } @@ -878,7 +882,7 @@ func (d *Distributor) validateSeriesV2(ts cortexpbv2.TimeSeries, seriesLabels [] // in the same series object. However, because the current Prometheus // remote write implementation only populates one or the other, // there never will be any. - return emptyV2Timeseries, err + return emptyPreallocSeriesV2, err } exemplars = append(exemplars, e) } @@ -890,11 +894,11 @@ func (d *Distributor) validateSeriesV2(ts cortexpbv2.TimeSeries, seriesLabels [] histograms = make([]cortexpbv2.Histogram, 0, len(ts.Histograms)) for i, h := range ts.Histograms { if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, seriesLabels, h.Timestamp); err != nil { - return emptyV2Timeseries, err + return emptyPreallocSeriesV2, err } convertedHistogram, err := validation.ValidateNativeHistogramV2(d.validateMetrics, limits, userID, seriesLabels, h) if err != nil { - return emptyV2Timeseries, err + return emptyPreallocSeriesV2, err } ts.Histograms[i] = convertedHistogram } @@ -904,15 +908,17 @@ func (d *Distributor) validateSeriesV2(ts cortexpbv2.TimeSeries, seriesLabels [] // validate metadata err := validation.ValidateMetadataV2(d.validateMetrics, limits, userID, symbols, ts.Metadata) if err != nil { - return emptyV2Timeseries, err + return emptyPreallocSeriesV2, err } - return cortexpbv2.TimeSeries{ - LabelsRefs: ts.LabelsRefs, - Samples: samples, - Exemplars: exemplars, - Histograms: histograms, - Metadata: ts.Metadata, + return cortexpbv2.PreallocTimeseriesV2{ + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: ts.LabelsRefs, + Samples: samples, + Exemplars: exemplars, + Histograms: histograms, + Metadata: ts.Metadata, + }, }, nil } @@ -967,7 +973,8 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, cortexpb.FromLabelsToLabelAdapters(req.Timeseries[0].ToLabels(&b, req.Symbols))) removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits) if err != nil { - // TODO(Sungjin1212): reuse timeseries slice + // Ensure the request slice is reused if the series get deduped. + cortexpbv2.ReuseSlice(req.Timeseries) if errors.Is(err, ha.ReplicasNotMatchError{}) { // These samples have been deduped. @@ -1000,12 +1007,18 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) d.receivedMetadata.WithLabelValues(userID).Add(float64(validatedMetadatas)) if len(seriesKeys) == 0 { + // Ensure the request slice is reused if there's no series or metadata passing the validation. + cortexpbv2.ReuseSlice(req.Timeseries) + return &cortexpbv2.WriteResponse{}, firstPartialErr } totalSamples := validatedFloatSamples + validatedHistogramSamples totalN := totalSamples + validatedExemplars + validatedMetadatas if !d.ingestionRateLimiter.AllowN(now, userID, int(totalN)) { + // Ensure the request slice is reused if the request is rate limited. + cortexpbv2.ReuseSlice(req.Timeseries) + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples)) d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedMetadatas)) diff --git a/pkg/distributor/distributor_prw2_test.go b/pkg/distributor/distributor_prw2_test.go index 81e80a76cf2..80f8d92f705 100644 --- a/pkg/distributor/distributor_prw2_test.go +++ b/pkg/distributor/distributor_prw2_test.go @@ -1032,8 +1032,8 @@ func TestPushPRW2_QuorumError(t *testing.T) { ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) for i := 0; i < numberOfWrites; i++ { - request := makeWriteRequest(0, 300, 200, 10) - _, err := d.Push(ctx, request) + request := makeWriteRequestV2WithSamples(0, 300, 200) + _, err := d.PushV2(ctx, request) status, ok := status.FromError(err) require.True(t, ok) require.Equal(t, codes.Code(500), status.Code()) @@ -1045,8 +1045,8 @@ func TestPushPRW2_QuorumError(t *testing.T) { ingesters[2].happy.Store(true) for i := 0; i < numberOfWrites; i++ { - request := makeWriteRequest(0, 30, 20, 10) - _, err := d.Push(ctx, request) + request := makeWriteRequestV2WithSamples(0, 30, 20) + _, err := d.PushV2(ctx, request) status, ok := status.FromError(err) require.True(t, ok) require.Equal(t, codes.Code(429), status.Code()) @@ -1058,8 +1058,8 @@ func TestPushPRW2_QuorumError(t *testing.T) { ingesters[2].happy.Store(true) for i := 0; i < 1; i++ { - request := makeWriteRequest(0, 30, 20, 10) - _, err := d.Push(ctx, request) + request := makeWriteRequestV2WithSamples(0, 30, 20) + _, err := d.PushV2(ctx, request) require.NoError(t, err) } @@ -1086,8 +1086,8 @@ func TestPushPRW2_QuorumError(t *testing.T) { }) for i := 0; i < numberOfWrites; i++ { - request := makeWriteRequest(0, 30, 20, 10) - _, err := d.Push(ctx, request) + request := makeWriteRequestV2WithSamples(0, 30, 20) + _, err := d.PushV2(ctx, request) require.Error(t, err) status, ok := status.FromError(err) require.True(t, ok) @@ -1188,18 +1188,17 @@ func TestDistributorPRW2_PushInstanceLimits(t *testing.T) { {samples: 100, expectedError: nil}, }, }, - // TODO(Sungjin1212): enable test after v2 prealloc timeseries implement - //"below rate limit on first request, but hits the rate limit afterwards": { - // preRateSamples: 500, - // ingestionRateLimit: 1000, - // - // pushes: []testPush{ - // {samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400 - // {samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120 - // {samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896 - // {samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8 - // }, - //}, + "below rate limit on first request, but hits the rate limit afterwards": { + preRateSamples: 500, + ingestionRateLimit: 1000, + + pushes: []testPush{ + {samples: 5000, expectedError: nil}, // after push, rate = 500 + 0.2*(5000-500) = 1400 + {samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1400 + 0.2*(0 - 1400) = 1120 + {samples: 5000, expectedError: errMaxSamplesPushRateLimitReached}, // after push, rate = 1120 + 0.2*(0 - 1120) = 896 + {samples: 5000, expectedError: nil}, // 896 is below 1000, so this push succeeds, new rate = 896 + 0.2*(5000-896) = 1716.8 + }, + }, } for testName, testData := range tests { @@ -2376,9 +2375,12 @@ func makeWriteRequestV2WithHistogram(startTimestampMs int64, histogram int, meta return request } -func makeMetadataV2FromST(value int, st *writev2.SymbolsTable) cortexpbv2.TimeSeries { - t := cortexpbv2.TimeSeries{} - t.LabelsRefs = []uint32{1, 2} +func makeMetadataV2FromST(value int, st *writev2.SymbolsTable) cortexpbv2.PreallocTimeseriesV2 { + t := cortexpbv2.PreallocTimeseriesV2{ + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: []uint32{1, 2}, + }, + } helpRef := st.Symbolize(fmt.Sprintf("a help for metric_%d", value)) t.Metadata.Type = cortexpbv2.METRIC_TYPE_COUNTER t.Metadata.HelpRef = helpRef @@ -2386,14 +2388,17 @@ func makeMetadataV2FromST(value int, st *writev2.SymbolsTable) cortexpbv2.TimeSe return t } -func makeTimeseriesV2FromST(labels []cortexpb.LabelAdapter, st *writev2.SymbolsTable, ts int64, value int, histogram bool, metadata bool) cortexpbv2.TimeSeries { +func makeTimeseriesV2FromST(labels []cortexpb.LabelAdapter, st *writev2.SymbolsTable, ts int64, value int, histogram bool, metadata bool) cortexpbv2.PreallocTimeseriesV2 { var helpRef uint32 if metadata { helpRef = st.Symbolize(fmt.Sprintf("a help for metric_%d", value)) } - t := cortexpbv2.TimeSeries{} - t.LabelsRefs = cortexpbv2.GetLabelRefsFromLabelAdapters(st.Symbols(), labels) + t := cortexpbv2.PreallocTimeseriesV2{ + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: st.SymbolizeLabels(cortexpb.FromLabelAdaptersToLabels(labels), nil), + }, + } if metadata { t.Metadata.Type = cortexpbv2.METRIC_TYPE_COUNTER t.Metadata.HelpRef = helpRef @@ -2411,7 +2416,7 @@ func makeTimeseriesV2FromST(labels []cortexpb.LabelAdapter, st *writev2.SymbolsT return t } -func makeWriteRequestV2Timeseries(labels []cortexpb.LabelAdapter, ts int64, value int, histogram bool, metadata bool) cortexpbv2.TimeSeries { +func makeWriteRequestV2Timeseries(labels []cortexpb.LabelAdapter, ts int64, value int, histogram bool, metadata bool) cortexpbv2.PreallocTimeseriesV2 { st := writev2.NewSymbolTable() st.SymbolizeLabels(cortexpb.FromLabelAdaptersToLabels(labels), nil) @@ -2420,8 +2425,11 @@ func makeWriteRequestV2Timeseries(labels []cortexpb.LabelAdapter, ts int64, valu helpRef = st.Symbolize(fmt.Sprintf("a help for metric_%d", value)) } - t := cortexpbv2.TimeSeries{} - t.LabelsRefs = cortexpbv2.GetLabelRefsFromLabelAdapters(st.Symbols(), labels) + t := cortexpbv2.PreallocTimeseriesV2{ + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: st.SymbolizeLabels(cortexpb.FromLabelAdaptersToLabels(labels), nil), + }, + } if metadata { t.Metadata.Type = cortexpbv2.METRIC_TYPE_COUNTER t.Metadata.HelpRef = helpRef @@ -2450,13 +2458,15 @@ func makeWriteRequestV2Exemplar(seriesLabels []string, timestamp int64, exemplar return &cortexpbv2.WriteRequest{ Symbols: st.Symbols(), - Timeseries: []cortexpbv2.TimeSeries{ + Timeseries: []cortexpbv2.PreallocTimeseriesV2{ { - LabelsRefs: cortexpbv2.GetLabelRefsFromLabelAdapters(st.Symbols(), cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...))), - Exemplars: []cortexpbv2.Exemplar{ - { - LabelsRefs: cortexpbv2.GetLabelRefsFromLabelAdapters(st.Symbols(), cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(exemplarLabels...))), - Timestamp: timestamp, + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: cortexpbv2.GetLabelRefsFromLabelAdapters(st.Symbols(), cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(seriesLabels...))), + Exemplars: []cortexpbv2.Exemplar{ + { + LabelsRefs: cortexpbv2.GetLabelRefsFromLabelAdapters(st.Symbols(), cortexpb.FromLabelsToLabelAdapters(labels.FromStrings(exemplarLabels...))), + Timestamp: timestamp, + }, }, }, }, @@ -2491,8 +2501,10 @@ func makeWriteRequestHAV2(samples int, replica, cluster string, histogram bool) request := &cortexpbv2.WriteRequest{} st := writev2.NewSymbolTable() for i := 0; i < samples; i++ { - ts := cortexpbv2.TimeSeries{ - LabelsRefs: st.SymbolizeLabels(labels.Labels{{Name: "__name__", Value: "foo"}, {Name: "__replica__", Value: replica}, {Name: "bar", Value: "baz"}, {Name: "cluster", Value: cluster}, {Name: "sample", Value: fmt.Sprintf("%d", i)}}, nil), + ts := cortexpbv2.PreallocTimeseriesV2{ + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: st.SymbolizeLabels(labels.Labels{{Name: "__name__", Value: "foo"}, {Name: "__replica__", Value: replica}, {Name: "bar", Value: "baz"}, {Name: "cluster", Value: cluster}, {Name: "sample", Value: fmt.Sprintf("%d", i)}}, nil), + }, } if histogram { ts.Histograms = []cortexpbv2.Histogram{ diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index a04bc72333f..68d3353cf80 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -3070,6 +3070,10 @@ func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWr return i.Push(ctx, &in.WriteRequest, opts...) } +func (i *mockIngester) PushPreAllocV2(ctx context.Context, in *cortexpbv2.PreallocWriteRequestV2, opts ...grpc.CallOption) (*cortexpbv2.WriteResponse, error) { + return i.PushV2(ctx, &in.WriteRequest, opts...) +} + func (i *mockIngester) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest, opts ...grpc.CallOption) (*cortexpbv2.WriteResponse, error) { i.Lock() defer i.Unlock() diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 648fa7ad536..b8b3ddefe9d 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -44,6 +44,7 @@ type HealthAndIngesterClient interface { grpc_health_v1.HealthClient Close() error PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) + PushPreAllocV2(ctx context.Context, in *cortexpbv2.PreallocWriteRequestV2, opts ...grpc.CallOption) (*cortexpbv2.WriteResponse, error) } type closableHealthAndIngesterClient struct { @@ -56,6 +57,17 @@ type closableHealthAndIngesterClient struct { inflightPushRequests *prometheus.GaugeVec } +func (c *closableHealthAndIngesterClient) PushPreAllocV2(ctx context.Context, in *cortexpbv2.PreallocWriteRequestV2, opts ...grpc.CallOption) (*cortexpbv2.WriteResponse, error) { + return c.handlePushRequestV2(func() (*cortexpbv2.WriteResponse, error) { + out := new(cortexpbv2.WriteResponse) + err := c.conn.Invoke(ctx, "/cortex.Ingester/PushV2", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil + }) +} + func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) { out := new(cortexpb.WriteResponse) @@ -73,17 +85,6 @@ func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb }) } -func (c *closableHealthAndIngesterClient) PushV2PreAlloc(ctx context.Context, in *cortexpbv2.WriteRequest, opts ...grpc.CallOption) (*cortexpbv2.WriteResponse, error) { - return c.handlePushRequestV2(func() (*cortexpbv2.WriteResponse, error) { - out := new(cortexpbv2.WriteResponse) - err := c.conn.Invoke(ctx, "/cortex.Ingester/PushV2", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil - }) -} - func (c *closableHealthAndIngesterClient) PushV2(ctx context.Context, in *cortexpbv2.WriteRequest, opts ...grpc.CallOption) (*cortexpbv2.WriteResponse, error) { return c.handlePushRequestV2(func() (*cortexpbv2.WriteResponse, error) { return c.IngesterClient.PushV2(ctx, in, opts...) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4d6fbe3e94f..e8891c6a596 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1078,6 +1078,10 @@ func (i *Ingester) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*c var firstPartialErr error + // NOTE: because we use `unsafe` in deserialisation, we must not + // retain anything from `req` past the call to ReuseSlice + defer cortexpbv2.ReuseSlice(req.Timeseries) + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err diff --git a/pkg/ingester/ingester_prw2_test.go b/pkg/ingester/ingester_prw2_test.go index 7ff77a470a5..ac132527fb3 100644 --- a/pkg/ingester/ingester_prw2_test.go +++ b/pkg/ingester/ingester_prw2_test.go @@ -212,19 +212,21 @@ func TestIngesterPRW2_Push(t *testing.T) { cortexpbv2.API), { Symbols: []string{"", "__name__", "test", "traceID", "123", "456"}, - Timeseries: []cortexpbv2.TimeSeries{ + Timeseries: []cortexpbv2.PreallocTimeseriesV2{ { - LabelsRefs: []uint32{1, 2}, - Exemplars: []cortexpbv2.Exemplar{ - { - LabelsRefs: []uint32{3, 4}, - Timestamp: 1000, - Value: 1000, - }, - { - LabelsRefs: []uint32{3, 5}, - Timestamp: 1001, - Value: 1001, + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: []uint32{1, 2}, + Exemplars: []cortexpbv2.Exemplar{ + { + LabelsRefs: []uint32{3, 4}, + Timestamp: 1000, + Value: 1000, + }, + { + LabelsRefs: []uint32{3, 5}, + Timestamp: 1001, + Value: 1001, + }, }, }, }, @@ -605,14 +607,16 @@ func TestIngesterPRW2_Push(t *testing.T) { // This is not done here. { Symbols: []string{"", "__name__", "test", "traceID", "123"}, - Timeseries: []cortexpbv2.TimeSeries{ + Timeseries: []cortexpbv2.PreallocTimeseriesV2{ { - LabelsRefs: []uint32{1, 2}, - Exemplars: []cortexpbv2.Exemplar{ - { - LabelsRefs: []uint32{3, 4}, - Timestamp: 1000, - Value: 1000, + TimeSeries: &cortexpbv2.TimeSeries{ + LabelsRefs: []uint32{1, 2}, + Exemplars: []cortexpbv2.Exemplar{ + { + LabelsRefs: []uint32{3, 4}, + Timestamp: 1000, + Value: 1000, + }, }, }, }, @@ -4394,7 +4398,7 @@ func writeRequestSingleSeriesV2(lbls labels.Labels, samples []cortexpbv2.Sample) ts := cortexpbv2.TimeSeries{} ts.Samples = samples ts.LabelsRefs = st.SymbolizeLabels(lbls, nil) - req.Timeseries = append(req.Timeseries, ts) + req.Timeseries = append(req.Timeseries, cortexpbv2.PreallocTimeseriesV2{TimeSeries: &ts}) req.Symbols = st.Symbols() return req