Skip to content

Commit

Permalink
Add rolling update e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 11, 2024
1 parent e739b3d commit 0bee591
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 29 deletions.
106 changes: 106 additions & 0 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,112 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestIngesterRollingUpdate(t *testing.T) {

const blockRangePeriod = 5 * time.Second
nonPushV2SupportImage := "quay.io/cortexproject/cortex:v1.18.1"

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-blocks-storage.tsdb.enable-native-histograms": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path := path.Join(s.SharedDir(), "cortex-1")

flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
// Start Cortex replicas.
// Start all other services.
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, nonPushV2SupportImage)
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint()}), "")

require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor, storeGateway))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()

// series push
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
res, err := c.PushV2(symbols1, series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// sample
result, err := c.Query("test_series", now)
require.NoError(t, err)
assert.Equal(t, expectedVector, result.(model.Vector))

// metadata
metadata, err := c.Metadata("test_series", "")
require.NoError(t, err)
require.Equal(t, 1, len(metadata["test_series"]))

// histogram
histogramIdx := rand.Uint32()
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
res, err = c.PushV2(symbols2, histogramSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
res, err = c.PushV2(symbols3, histogramFloatSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

testHistogramTimestamp := now.Add(blockRangePeriod * 2)
expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx))
result, err = c.Query(`test_histogram`, testHistogramTimestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
v := result.(model.Vector)
require.Equal(t, 2, v.Len())
for _, s := range v {
require.NotNil(t, s.Histogram)
require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count))
require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum))
}
}

func TestIngest(t *testing.T) {
const blockRangePeriod = 5 * time.Second

Expand Down
3 changes: 1 addition & 2 deletions pkg/cortex/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/weaveworks/common/server"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
)

func changeTargetConfig(c *Config) {
Expand Down Expand Up @@ -161,7 +160,7 @@ func TestCortex_InitRulerStorage(t *testing.T) {

type myPusher struct{}

func (p *myPusher) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
func (p *myPusher) PushV2(ctx context.Context, req *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum MetricType {
// https://github.com/prometheus/prometheus/blob/main/prompb/io/prometheus/write/v2/types.proto
message WriteRequestV2 {
reserved 1 to 2;
cortexpb.SourceEnum Source = 3;
SourceEnum Source = 3;
repeated string symbols = 4;
repeated TimeSeriesV2 timeseries = 5 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocTimeseriesV2"];

Expand Down
6 changes: 3 additions & 3 deletions pkg/cortexpb/slicesPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type byteSlicePools struct {
pools []sync.Pool
}

func NewSlicePool(pools int) *byteSlicePools {
func newSlicePool(pools int) *byteSlicePools {
sp := byteSlicePools{}
sp.init(pools)
return &sp
Expand All @@ -32,7 +32,7 @@ func (sp *byteSlicePools) init(pools int) {
}
}

func (sp *byteSlicePools) GetSlice(size int) *[]byte {
func (sp *byteSlicePools) getSlice(size int) *[]byte {
index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower

if index >= len(sp.pools) {
Expand All @@ -50,7 +50,7 @@ func (sp *byteSlicePools) GetSlice(size int) *[]byte {
return s
}

func (sp *byteSlicePools) ReuseSlice(s *[]byte) {
func (sp *byteSlicePools) reuseSlice(s *[]byte) {
index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower

if index >= len(sp.pools) || index < 0 {
Expand Down
12 changes: 6 additions & 6 deletions pkg/cortexpb/slicesPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
)

func TestFuzzyByteSlicePools(t *testing.T) {
sut := NewSlicePool(20)
sut := newSlicePool(20)
maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1))

for i := 0; i < 1000; i++ {
size := rand.Int() % maxByteSize
s := sut.GetSlice(size)
s := sut.getSlice(size)
assert.Equal(t, len(*s), size)
sut.ReuseSlice(s)
sut.reuseSlice(s)
}
}

func TestReturnSliceSmallerThanMin(t *testing.T) {
sut := NewSlicePool(20)
sut := newSlicePool(20)
size := 3
buff := make([]byte, 0, size)
sut.ReuseSlice(&buff)
buff2 := sut.GetSlice(size * 2)
sut.reuseSlice(&buff)
buff2 := sut.getSlice(size * 2)
assert.Equal(t, len(*buff2), size*2)
}
6 changes: 3 additions & 3 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
}
},
}
bytePool = NewSlicePool(20)
bytePool = newSlicePool(20)
)

// PreallocConfig configures how structures will be preallocated to optimise
Expand Down Expand Up @@ -86,7 +86,7 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {

func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.GetSlice(size)
p.data = bytePool.getSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
Expand All @@ -97,7 +97,7 @@ func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {

func ReuseWriteRequest(req *PreallocWriteRequest) {
if req.data != nil {
bytePool.ReuseSlice(req.data)
bytePool.reuseSlice(req.data)
req.data = nil
}
req.Source = 0
Expand Down
6 changes: 3 additions & 3 deletions pkg/cortexpb/timeseriesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
}
},
}
bytePoolV2 = NewSlicePool(20)
bytePoolV2 = newSlicePool(20)
)

// PreallocWriteRequestV2 is a WriteRequest which preallocs slices on Unmarshal.
Expand All @@ -51,7 +51,7 @@ func (p *PreallocWriteRequestV2) Unmarshal(dAtA []byte) error {

func (p *PreallocWriteRequestV2) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.GetSlice(size)
p.data = bytePool.getSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
Expand All @@ -73,7 +73,7 @@ func (p *PreallocTimeseriesV2) Unmarshal(dAtA []byte) error {

func ReuseWriteRequestV2(req *PreallocWriteRequestV2) {
if req.data != nil {
bytePoolV2.ReuseSlice(req.data)
bytePoolV2.reuseSlice(req.data)
req.data = nil
}
req.Source = 0
Expand Down
5 changes: 2 additions & 3 deletions pkg/ingester/client/cortex_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import (
"github.com/stretchr/testify/mock"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
)

type IngesterServerMock struct {
mock.Mock
}

func (m *IngesterServerMock) PushV2(ctx context.Context, r *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
func (m *IngesterServerMock) PushV2(ctx context.Context, r *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) {
args := m.Called(ctx, r)
return args.Get(0).(*cortexpbv2.WriteResponse), args.Error(1)
return args.Get(0).(*cortexpb.WriteResponseV2), args.Error(1)
}

func (m *IngesterServerMock) Push(ctx context.Context, r *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/querier/tripperware/query.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type fakePusher struct {
request *cortexpb.WriteRequest
requestV2 *cortexpbv2.WriteRequest
requestV2 *cortexpb.WriteRequestV2
response *cortexpb.WriteResponse
responseV2 *cortexpbv2.WriteResponse
responseV2 *cortexpb.WriteResponseV2
err error
}

func (p *fakePusher) PushV2(ctx context.Context, r *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
func (p *fakePusher) PushV2(ctx context.Context, r *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) {
p.requestV2 = r
return p.responseV2, p.err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/ruler/pusher_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/stretchr/testify/mock"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/cortexpbv2"
)

type pusherMock struct {
Expand All @@ -17,9 +16,9 @@ func newPusherMock() *pusherMock {
return &pusherMock{}
}

func (m *pusherMock) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) {
func (m *pusherMock) PushV2(ctx context.Context, req *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) {
args := m.Called(ctx, req)
return args.Get(0).(*cortexpbv2.WriteResponse), args.Error(1)
return args.Get(0).(*cortexpb.WriteResponseV2), args.Error(1)
}

func (m *pusherMock) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
Expand Down

0 comments on commit 0bee591

Please sign in to comment.