Skip to content

Commit

Permalink
Add prealloc timeseies v2
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 7, 2024
1 parent 878562f commit 80124e3
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 235 deletions.
23 changes: 11 additions & 12 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"

"github.com/cortexproject/cortex/pkg/cortexpbv2"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
)

Expand Down Expand Up @@ -151,7 +150,7 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
return
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries) {
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

st := writev2.NewSymbolTable()
Expand All @@ -164,20 +163,20 @@ func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogr
var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph cortexpbv2.Histogram
ph writev2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
ph = cortexpbv2.FloatHistogramToHistogramProto(tsMillis, fh)
ph = writev2.FromFloatHistogram(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int(i))
ph = cortexpbv2.HistogramToHistogramProto(tsMillis, h)
ph = writev2.FromIntHistogram(tsMillis, h)
}

// Generate the series
series = append(series, cortexpbv2.TimeSeries{
series = append(series, writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Histograms: []cortexpbv2.Histogram{ph},
Histograms: []writev2.Histogram{ph},
})

symbols = st.Symbols()
Expand Down Expand Up @@ -224,7 +223,7 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram
return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []cortexpbv2.TimeSeries, vector model.Vector) {
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

Expand All @@ -237,14 +236,14 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe
Value: label.Value,
})
}
series = append(series, cortexpbv2.TimeSeries{
series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lbs, nil),
Samples: []cortexpbv2.Sample{
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: cortexpbv2.Metadata{
Type: cortexpbv2.METRIC_TYPE_GAUGE,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
},
})
symbols = st.Symbols()
Expand Down
6 changes: 3 additions & 3 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
yaml "gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util/backoff"
)
Expand Down Expand Up @@ -114,9 +114,9 @@ func NewPromQueryClient(address string) (*Client, error) {
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []cortexpbv2.TimeSeries) (*http.Response, error) {
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(&cortexpbv2.WriteRequest{Symbols: symbols, Timeseries: timeseries})
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}
Expand Down
28 changes: 13 additions & 15 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

Expand Down Expand Up @@ -176,24 +176,22 @@ func TestExemplar(t *testing.T) {
now := time.Now()
tsMillis := e2e.TimeToMilliseconds(now)

req := &cortexpbv2.WriteRequest{
Symbols: []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
Timeseries: []cortexpbv2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Metadata: cortexpbv2.Metadata{
Type: cortexpbv2.METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type.

HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
},
Samples: []cortexpbv2.Sample{{Value: 1, Timestamp: tsMillis}},
Exemplars: []cortexpbv2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}},
symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
timeseries := []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type.

HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
},
Samples: []writev2.Sample{{Value: 1, Timestamp: tsMillis}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}},
},
}

res, err := c.PushV2(req.Symbols, req.Timeseries)
res, err := c.PushV2(symbols, timeseries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "0", "1")
Expand Down
6 changes: 3 additions & 3 deletions pkg/cortexpb/slicesPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type byteSlicePools struct {
pools []sync.Pool
}

func newSlicePool(pools int) *byteSlicePools {
func NewSlicePool(pools int) *byteSlicePools {
sp := byteSlicePools{}
sp.init(pools)
return &sp
Expand All @@ -32,7 +32,7 @@ func (sp *byteSlicePools) init(pools int) {
}
}

func (sp *byteSlicePools) getSlice(size int) *[]byte {
func (sp *byteSlicePools) GetSlice(size int) *[]byte {
index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower

if index >= len(sp.pools) {
Expand All @@ -50,7 +50,7 @@ func (sp *byteSlicePools) getSlice(size int) *[]byte {
return s
}

func (sp *byteSlicePools) reuseSlice(s *[]byte) {
func (sp *byteSlicePools) ReuseSlice(s *[]byte) {
index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower

if index >= len(sp.pools) || index < 0 {
Expand Down
12 changes: 6 additions & 6 deletions pkg/cortexpb/slicesPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
)

func TestFuzzyByteSlicePools(t *testing.T) {
sut := newSlicePool(20)
sut := NewSlicePool(20)
maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1))

for i := 0; i < 1000; i++ {
size := rand.Int() % maxByteSize
s := sut.getSlice(size)
s := sut.GetSlice(size)
assert.Equal(t, len(*s), size)
sut.reuseSlice(s)
sut.ReuseSlice(s)
}
}

func TestReturnSliceSmallerThanMin(t *testing.T) {
sut := newSlicePool(20)
sut := NewSlicePool(20)
size := 3
buff := make([]byte, 0, size)
sut.reuseSlice(&buff)
buff2 := sut.getSlice(size * 2)
sut.ReuseSlice(&buff)
buff2 := sut.GetSlice(size * 2)
assert.Equal(t, len(*buff2), size*2)
}
6 changes: 3 additions & 3 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
}
},
}
bytePool = newSlicePool(20)
bytePool = NewSlicePool(20)
)

// PreallocConfig configures how structures will be preallocated to optimise
Expand Down Expand Up @@ -86,7 +86,7 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {

func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.getSlice(size)
p.data = bytePool.GetSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
Expand All @@ -97,7 +97,7 @@ func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {

func ReuseWriteRequest(req *PreallocWriteRequest) {
if req.data != nil {
bytePool.reuseSlice(req.data)
bytePool.ReuseSlice(req.data)
req.data = nil
}
req.Source = 0
Expand Down
9 changes: 5 additions & 4 deletions pkg/cortexpbv2/compatv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histo
symbols := st.Symbols()

req := &WriteRequest{
Symbols: symbols,
Source: source,
Timeseries: PreallocTimeseriesV2SliceFromPool(),
Symbols: symbols,
Source: source,
}

i := 0
for i < len(samples) || i < len(histograms) || i < len(metadata) {
ts := TimeSeries{}
ts := TimeseriesV2FromPool()
ts.LabelsRefs = labelRefs[i]
if i < len(samples) {
ts.Samples = append(ts.Samples, samples[i])
Expand All @@ -40,7 +41,7 @@ func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histo
ts.Metadata = metadata[i]
}
i++
req.Timeseries = append(req.Timeseries, ts)
req.Timeseries = append(req.Timeseries, PreallocTimeseriesV2{TimeSeries: ts})
}

return req
Expand Down
Loading

0 comments on commit 80124e3

Please sign in to comment.