From 0bee591f5300f5e415f7954b8a46f6969a62ee07 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 11 Nov 2024 12:09:10 +0900 Subject: [PATCH] Add rolling update e2e test Signed-off-by: SungJin1212 --- integration/remote_write_v2_test.go | 106 ++++++++++++++++++++++++ pkg/cortex/modules_test.go | 3 +- pkg/cortexpb/cortex.proto | 2 +- pkg/cortexpb/slicesPool.go | 6 +- pkg/cortexpb/slicesPool_test.go | 12 +-- pkg/cortexpb/timeseries.go | 6 +- pkg/cortexpb/timeseriesv2.go | 6 +- pkg/ingester/client/cortex_mock_test.go | 5 +- pkg/querier/tripperware/query.pb.go | 1 - pkg/ruler/compat_test.go | 7 +- pkg/ruler/pusher_mock_test.go | 5 +- 11 files changed, 130 insertions(+), 29 deletions(-) diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index 775ccb98bb..9afde5e35a 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -23,6 +23,112 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb" ) +func TestIngesterRollingUpdate(t *testing.T) { + + const blockRangePeriod = 5 * time.Second + nonPushV2SupportImage := "quay.io/cortexproject/cortex:v1.18.1" + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + // Start Cortex replicas. + // Start all other services. + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, nonPushV2SupportImage) + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint()}), "") + + require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor, storeGateway)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + + // series push + symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) + res, err := c.PushV2(symbols1, series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // sample + result, err := c.Query("test_series", now) + require.NoError(t, err) + assert.Equal(t, expectedVector, result.(model.Vector)) + + // metadata + metadata, err := c.Metadata("test_series", "") + require.NoError(t, err) + require.Equal(t, 1, len(metadata["test_series"])) + + // histogram + histogramIdx := rand.Uint32() + symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + res, err = c.PushV2(symbols2, histogramSeries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + res, err = c.PushV2(symbols3, histogramFloatSeries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + testHistogramTimestamp := now.Add(blockRangePeriod * 2) + expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx)) + result, err = c.Query(`test_histogram`, testHistogramTimestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + v := result.(model.Vector) + require.Equal(t, 2, v.Len()) + for _, s := range v { + require.NotNil(t, s.Histogram) + require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count)) + require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum)) + } +} + func TestIngest(t *testing.T) { const blockRangePeriod = 5 * time.Second diff --git a/pkg/cortex/modules_test.go b/pkg/cortex/modules_test.go index ae5fc64582..2cf79132cb 100644 --- a/pkg/cortex/modules_test.go +++ b/pkg/cortex/modules_test.go @@ -13,7 +13,6 @@ import ( "github.com/weaveworks/common/server" "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/cortexpbv2" ) func changeTargetConfig(c *Config) { @@ -161,7 +160,7 @@ func TestCortex_InitRulerStorage(t *testing.T) { type myPusher struct{} -func (p *myPusher) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) { +func (p *myPusher) PushV2(ctx context.Context, req *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) { return nil, nil } diff --git a/pkg/cortexpb/cortex.proto b/pkg/cortexpb/cortex.proto index b16074c007..c7b1209503 100644 --- a/pkg/cortexpb/cortex.proto +++ b/pkg/cortexpb/cortex.proto @@ -28,7 +28,7 @@ enum MetricType { // https://github.com/prometheus/prometheus/blob/main/prompb/io/prometheus/write/v2/types.proto message WriteRequestV2 { reserved 1 to 2; - cortexpb.SourceEnum Source = 3; + SourceEnum Source = 3; repeated string symbols = 4; repeated TimeSeriesV2 timeseries = 5 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocTimeseriesV2"]; diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go index eaace237ce..e28d51d4f2 100644 --- a/pkg/cortexpb/slicesPool.go +++ b/pkg/cortexpb/slicesPool.go @@ -13,7 +13,7 @@ type byteSlicePools struct { pools []sync.Pool } -func NewSlicePool(pools int) *byteSlicePools { +func newSlicePool(pools int) *byteSlicePools { sp := byteSlicePools{} sp.init(pools) return &sp @@ -32,7 +32,7 @@ func (sp *byteSlicePools) init(pools int) { } } -func (sp *byteSlicePools) GetSlice(size int) *[]byte { +func (sp *byteSlicePools) getSlice(size int) *[]byte { index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower if index >= len(sp.pools) { @@ -50,7 +50,7 @@ func (sp *byteSlicePools) GetSlice(size int) *[]byte { return s } -func (sp *byteSlicePools) ReuseSlice(s *[]byte) { +func (sp *byteSlicePools) reuseSlice(s *[]byte) { index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower if index >= len(sp.pools) || index < 0 { diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go index dd35beb33a..9bc56cdec3 100644 --- a/pkg/cortexpb/slicesPool_test.go +++ b/pkg/cortexpb/slicesPool_test.go @@ -9,22 +9,22 @@ import ( ) func TestFuzzyByteSlicePools(t *testing.T) { - sut := NewSlicePool(20) + sut := newSlicePool(20) maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1)) for i := 0; i < 1000; i++ { size := rand.Int() % maxByteSize - s := sut.GetSlice(size) + s := sut.getSlice(size) assert.Equal(t, len(*s), size) - sut.ReuseSlice(s) + sut.reuseSlice(s) } } func TestReturnSliceSmallerThanMin(t *testing.T) { - sut := NewSlicePool(20) + sut := newSlicePool(20) size := 3 buff := make([]byte, 0, size) - sut.ReuseSlice(&buff) - buff2 := sut.GetSlice(size * 2) + sut.reuseSlice(&buff) + buff2 := sut.getSlice(size * 2) assert.Equal(t, len(*buff2), size*2) } diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index b880739ae2..db7354ffe4 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -47,7 +47,7 @@ var ( } }, } - bytePool = NewSlicePool(20) + bytePool = newSlicePool(20) ) // PreallocConfig configures how structures will be preallocated to optimise @@ -86,7 +86,7 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { size := p.Size() - p.data = bytePool.GetSlice(size) + p.data = bytePool.getSlice(size) dAtA = *p.data n, err := p.MarshalToSizedBuffer(dAtA[:size]) if err != nil { @@ -97,7 +97,7 @@ func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { func ReuseWriteRequest(req *PreallocWriteRequest) { if req.data != nil { - bytePool.ReuseSlice(req.data) + bytePool.reuseSlice(req.data) req.data = nil } req.Source = 0 diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index 28eca0f973..2d64b21d7c 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -34,7 +34,7 @@ var ( } }, } - bytePoolV2 = NewSlicePool(20) + bytePoolV2 = newSlicePool(20) ) // PreallocWriteRequestV2 is a WriteRequest which preallocs slices on Unmarshal. @@ -51,7 +51,7 @@ func (p *PreallocWriteRequestV2) Unmarshal(dAtA []byte) error { func (p *PreallocWriteRequestV2) Marshal() (dAtA []byte, err error) { size := p.Size() - p.data = bytePool.GetSlice(size) + p.data = bytePool.getSlice(size) dAtA = *p.data n, err := p.MarshalToSizedBuffer(dAtA[:size]) if err != nil { @@ -73,7 +73,7 @@ func (p *PreallocTimeseriesV2) Unmarshal(dAtA []byte) error { func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { if req.data != nil { - bytePoolV2.ReuseSlice(req.data) + bytePoolV2.reuseSlice(req.data) req.data = nil } req.Source = 0 diff --git a/pkg/ingester/client/cortex_mock_test.go b/pkg/ingester/client/cortex_mock_test.go index 5aff8bf302..d378495dcc 100644 --- a/pkg/ingester/client/cortex_mock_test.go +++ b/pkg/ingester/client/cortex_mock_test.go @@ -6,16 +6,15 @@ import ( "github.com/stretchr/testify/mock" "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/cortexpbv2" ) type IngesterServerMock struct { mock.Mock } -func (m *IngesterServerMock) PushV2(ctx context.Context, r *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) { +func (m *IngesterServerMock) PushV2(ctx context.Context, r *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) { args := m.Called(ctx, r) - return args.Get(0).(*cortexpbv2.WriteResponse), args.Error(1) + return args.Get(0).(*cortexpb.WriteResponseV2), args.Error(1) } func (m *IngesterServerMock) Push(ctx context.Context, r *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { diff --git a/pkg/querier/tripperware/query.pb.go b/pkg/querier/tripperware/query.pb.go index 2e16fc9c6d..5fc41d7ed9 100644 --- a/pkg/querier/tripperware/query.pb.go +++ b/pkg/querier/tripperware/query.pb.go @@ -737,7 +737,6 @@ func (m *PrometheusResponseHeader) GetValues() []string { type PrometheusQueryResult struct { // Types that are valid to be assigned to Result: - // // *PrometheusQueryResult_Vector // *PrometheusQueryResult_RawBytes // *PrometheusQueryResult_Matrix diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 7d489b65b0..21f76be12c 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -23,20 +23,19 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/cortexpbv2" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util/validation" ) type fakePusher struct { request *cortexpb.WriteRequest - requestV2 *cortexpbv2.WriteRequest + requestV2 *cortexpb.WriteRequestV2 response *cortexpb.WriteResponse - responseV2 *cortexpbv2.WriteResponse + responseV2 *cortexpb.WriteResponseV2 err error } -func (p *fakePusher) PushV2(ctx context.Context, r *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) { +func (p *fakePusher) PushV2(ctx context.Context, r *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) { p.requestV2 = r return p.responseV2, p.err } diff --git a/pkg/ruler/pusher_mock_test.go b/pkg/ruler/pusher_mock_test.go index 02fc5b13de..c909e66d22 100644 --- a/pkg/ruler/pusher_mock_test.go +++ b/pkg/ruler/pusher_mock_test.go @@ -6,7 +6,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/cortexpbv2" ) type pusherMock struct { @@ -17,9 +16,9 @@ func newPusherMock() *pusherMock { return &pusherMock{} } -func (m *pusherMock) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest) (*cortexpbv2.WriteResponse, error) { +func (m *pusherMock) PushV2(ctx context.Context, req *cortexpb.WriteRequestV2) (*cortexpb.WriteResponseV2, error) { args := m.Called(ctx, req) - return args.Get(0).(*cortexpbv2.WriteResponse), args.Error(1) + return args.Get(0).(*cortexpb.WriteResponseV2), args.Error(1) } func (m *pusherMock) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {