From 7611128e579e13dad3fe39471f4c03eecf4755cd Mon Sep 17 00:00:00 2001 From: chyezh Date: Thu, 4 Jul 2024 15:23:08 +0800 Subject: [PATCH] enhance: wal adaptor implementation (#34122) issue: #33285 - add adaptor to implement walimpls into wal interface. - implement timetick sorted and filtering scanner. - add test for wal. --------- Signed-off-by: chyezh --- go.mod | 2 +- .../mock_grpc/mock_ClientStream.go | 302 ++++++++++++++++ internal/proto/streaming.proto | 46 +++ .../server/wal/adaptor/builder.go | 11 +- .../server/wal/adaptor/opener.go | 86 +++++ .../server/wal/adaptor/opener_test.go | 117 ++++++ .../server/wal/adaptor/scanner_adaptor.go | 121 +++++++ .../wal/adaptor/scanner_adaptor_test.go | 29 ++ .../server/wal/adaptor/scanner_registry.go | 31 ++ .../server/wal/adaptor/wal_adaptor.go | 152 ++++++++ .../server/wal/adaptor/wal_adaptor_test.go | 163 +++++++++ .../server/wal/adaptor/wal_test.go | 339 ++++++++++++++++++ .../timetick/timetick_interceptor.go | 58 +-- internal/streamingnode/server/wal/scanner.go | 4 +- .../wal/utility/immutable_message_queue.go | 51 +++ .../utility/immutable_message_queue_test.go | 25 ++ .../server/wal/utility/message_heap.go | 45 +++ .../server/wal/utility/message_heap_test.go | 29 ++ .../server/wal/utility/reorder_buffer.go | 38 ++ .../server/wal/utility/reorder_buffer_test.go | 43 +++ internal/streamingservice/.mockery.yaml | 10 +- internal/util/streamingutil/status/checker.go | 47 +++ .../util/streamingutil/status/checker_test.go | 19 + .../status/client_stream_wrapper.go | 34 ++ .../status/client_stream_wrapper_test.go | 33 ++ .../util/streamingutil/status/rpc_error.go | 101 ++++++ .../streamingutil/status/rpc_error_test.go | 48 +++ .../streamingutil/status/streaming_error.go | 119 ++++++ .../status/streaming_error_test.go | 65 ++++ .../util/streamingutil/util/id_allocator.go | 17 + pkg/metrics/metrics.go | 1 + pkg/metrics/streaming_service_metrics.go | 180 ++++++++++ pkg/util/typeutil/type.go | 5 + 33 files changed, 2341 insertions(+), 30 deletions(-) create mode 100644 internal/mocks/google.golang.org/mock_grpc/mock_ClientStream.go create mode 100644 internal/streamingnode/server/wal/adaptor/opener.go create mode 100644 internal/streamingnode/server/wal/adaptor/opener_test.go create mode 100644 internal/streamingnode/server/wal/adaptor/scanner_adaptor.go create mode 100644 internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go create mode 100644 internal/streamingnode/server/wal/adaptor/scanner_registry.go create mode 100644 internal/streamingnode/server/wal/adaptor/wal_adaptor.go create mode 100644 internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go create mode 100644 internal/streamingnode/server/wal/adaptor/wal_test.go create mode 100644 internal/streamingnode/server/wal/utility/immutable_message_queue.go create mode 100644 internal/streamingnode/server/wal/utility/immutable_message_queue_test.go create mode 100644 internal/streamingnode/server/wal/utility/message_heap.go create mode 100644 internal/streamingnode/server/wal/utility/message_heap_test.go create mode 100644 internal/streamingnode/server/wal/utility/reorder_buffer.go create mode 100644 internal/streamingnode/server/wal/utility/reorder_buffer_test.go create mode 100644 internal/util/streamingutil/status/checker.go create mode 100644 internal/util/streamingutil/status/checker_test.go create mode 100644 internal/util/streamingutil/status/client_stream_wrapper.go create mode 100644 internal/util/streamingutil/status/client_stream_wrapper_test.go create mode 100644 internal/util/streamingutil/status/rpc_error.go create mode 100644 internal/util/streamingutil/status/rpc_error_test.go create mode 100644 internal/util/streamingutil/status/streaming_error.go create mode 100644 internal/util/streamingutil/status/streaming_error_test.go create mode 100644 internal/util/streamingutil/util/id_allocator.go create mode 100644 pkg/metrics/streaming_service_metrics.go diff --git a/go.mod b/go.mod index 97570a30fc96d..be1a905e688e8 100644 --- a/go.mod +++ b/go.mod @@ -65,6 +65,7 @@ require ( require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 require ( + github.com/cockroachdb/redact v1.1.3 github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000 github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 @@ -102,7 +103,6 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/cilium/ebpf v0.11.0 // indirect github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect - github.com/cockroachdb/redact v1.1.3 // indirect github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect diff --git a/internal/mocks/google.golang.org/mock_grpc/mock_ClientStream.go b/internal/mocks/google.golang.org/mock_grpc/mock_ClientStream.go new file mode 100644 index 0000000000000..2bef81aba2066 --- /dev/null +++ b/internal/mocks/google.golang.org/mock_grpc/mock_ClientStream.go @@ -0,0 +1,302 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_grpc + +import ( + context "context" + + metadata "google.golang.org/grpc/metadata" + + mock "github.com/stretchr/testify/mock" +) + +// MockClientStream is an autogenerated mock type for the ClientStream type +type MockClientStream struct { + mock.Mock +} + +type MockClientStream_Expecter struct { + mock *mock.Mock +} + +func (_m *MockClientStream) EXPECT() *MockClientStream_Expecter { + return &MockClientStream_Expecter{mock: &_m.Mock} +} + +// CloseSend provides a mock function with given fields: +func (_m *MockClientStream) CloseSend() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockClientStream_CloseSend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CloseSend' +type MockClientStream_CloseSend_Call struct { + *mock.Call +} + +// CloseSend is a helper method to define mock.On call +func (_e *MockClientStream_Expecter) CloseSend() *MockClientStream_CloseSend_Call { + return &MockClientStream_CloseSend_Call{Call: _e.mock.On("CloseSend")} +} + +func (_c *MockClientStream_CloseSend_Call) Run(run func()) *MockClientStream_CloseSend_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClientStream_CloseSend_Call) Return(_a0 error) *MockClientStream_CloseSend_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockClientStream_CloseSend_Call) RunAndReturn(run func() error) *MockClientStream_CloseSend_Call { + _c.Call.Return(run) + return _c +} + +// Context provides a mock function with given fields: +func (_m *MockClientStream) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// MockClientStream_Context_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Context' +type MockClientStream_Context_Call struct { + *mock.Call +} + +// Context is a helper method to define mock.On call +func (_e *MockClientStream_Expecter) Context() *MockClientStream_Context_Call { + return &MockClientStream_Context_Call{Call: _e.mock.On("Context")} +} + +func (_c *MockClientStream_Context_Call) Run(run func()) *MockClientStream_Context_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClientStream_Context_Call) Return(_a0 context.Context) *MockClientStream_Context_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockClientStream_Context_Call) RunAndReturn(run func() context.Context) *MockClientStream_Context_Call { + _c.Call.Return(run) + return _c +} + +// Header provides a mock function with given fields: +func (_m *MockClientStream) Header() (metadata.MD, error) { + ret := _m.Called() + + var r0 metadata.MD + var r1 error + if rf, ok := ret.Get(0).(func() (metadata.MD, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockClientStream_Header_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Header' +type MockClientStream_Header_Call struct { + *mock.Call +} + +// Header is a helper method to define mock.On call +func (_e *MockClientStream_Expecter) Header() *MockClientStream_Header_Call { + return &MockClientStream_Header_Call{Call: _e.mock.On("Header")} +} + +func (_c *MockClientStream_Header_Call) Run(run func()) *MockClientStream_Header_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClientStream_Header_Call) Return(_a0 metadata.MD, _a1 error) *MockClientStream_Header_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockClientStream_Header_Call) RunAndReturn(run func() (metadata.MD, error)) *MockClientStream_Header_Call { + _c.Call.Return(run) + return _c +} + +// RecvMsg provides a mock function with given fields: m +func (_m *MockClientStream) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockClientStream_RecvMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecvMsg' +type MockClientStream_RecvMsg_Call struct { + *mock.Call +} + +// RecvMsg is a helper method to define mock.On call +// - m interface{} +func (_e *MockClientStream_Expecter) RecvMsg(m interface{}) *MockClientStream_RecvMsg_Call { + return &MockClientStream_RecvMsg_Call{Call: _e.mock.On("RecvMsg", m)} +} + +func (_c *MockClientStream_RecvMsg_Call) Run(run func(m interface{})) *MockClientStream_RecvMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *MockClientStream_RecvMsg_Call) Return(_a0 error) *MockClientStream_RecvMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockClientStream_RecvMsg_Call) RunAndReturn(run func(interface{}) error) *MockClientStream_RecvMsg_Call { + _c.Call.Return(run) + return _c +} + +// SendMsg provides a mock function with given fields: m +func (_m *MockClientStream) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockClientStream_SendMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendMsg' +type MockClientStream_SendMsg_Call struct { + *mock.Call +} + +// SendMsg is a helper method to define mock.On call +// - m interface{} +func (_e *MockClientStream_Expecter) SendMsg(m interface{}) *MockClientStream_SendMsg_Call { + return &MockClientStream_SendMsg_Call{Call: _e.mock.On("SendMsg", m)} +} + +func (_c *MockClientStream_SendMsg_Call) Run(run func(m interface{})) *MockClientStream_SendMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *MockClientStream_SendMsg_Call) Return(_a0 error) *MockClientStream_SendMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockClientStream_SendMsg_Call) RunAndReturn(run func(interface{}) error) *MockClientStream_SendMsg_Call { + _c.Call.Return(run) + return _c +} + +// Trailer provides a mock function with given fields: +func (_m *MockClientStream) Trailer() metadata.MD { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + return r0 +} + +// MockClientStream_Trailer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Trailer' +type MockClientStream_Trailer_Call struct { + *mock.Call +} + +// Trailer is a helper method to define mock.On call +func (_e *MockClientStream_Expecter) Trailer() *MockClientStream_Trailer_Call { + return &MockClientStream_Trailer_Call{Call: _e.mock.On("Trailer")} +} + +func (_c *MockClientStream_Trailer_Call) Run(run func()) *MockClientStream_Trailer_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClientStream_Trailer_Call) Return(_a0 metadata.MD) *MockClientStream_Trailer_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockClientStream_Trailer_Call) RunAndReturn(run func() metadata.MD) *MockClientStream_Trailer_Call { + _c.Call.Return(run) + return _c +} + +// NewMockClientStream creates a new instance of MockClientStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockClientStream(t interface { + mock.TestingT + Cleanup(func()) +}) *MockClientStream { + mock := &MockClientStream{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/proto/streaming.proto b/internal/proto/streaming.proto index 5ad393b25401c..4c5b951229348 100644 --- a/internal/proto/streaming.proto +++ b/internal/proto/streaming.proto @@ -34,6 +34,7 @@ message VChannelInfo { string name = 1; } +// DeliverPolicy is the policy to deliver message. message DeliverPolicy { oneof policy { google.protobuf.Empty all = 1; // deliver all messages. @@ -42,3 +43,48 @@ message DeliverPolicy { MessageID start_after = 4; // deliver message after this message id. (startAfter, ...] } } + +// DeliverFilter is the filter to deliver message. +message DeliverFilter { + oneof filter { + DeliverFilterTimeTickGT time_tick_gt = 1; + DeliverFilterTimeTickGTE time_tick_gte = 2; + DeliverFilterVChannel vchannel = 3; + } +} + +// DeliverFilterTimeTickGT is the filter to deliver message with time tick greater than this value. +message DeliverFilterTimeTickGT { + uint64 time_tick = 1; // deliver message with time tick greater than this value. +} + +// DeliverFilterTimeTickGTE is the filter to deliver message with time tick greater than or equal to this value. +message DeliverFilterTimeTickGTE { + uint64 time_tick = 1; // deliver message with time tick greater than or equal to this value. +} + +// DeliverFilterVChannel is the filter to deliver message with vchannel name. +message DeliverFilterVChannel { + string vchannel = 1; // deliver message with vchannel name. +} + +// StreamingCode is the error code for log internal component. +enum StreamingCode { + STREAMING_CODE_OK = 0; + STREAMING_CODE_CHANNEL_EXIST = 1; // channel already exist + STREAMING_CODE_CHANNEL_NOT_EXIST = 2; // channel not exist + STREAMING_CODE_CHANNEL_FENCED = 3; // channel is fenced + STREAMING_CODE_ON_SHUTDOWN = 4; // component is on shutdown + STREAMING_CODE_INVALID_REQUEST_SEQ = 5; // invalid request sequence + STREAMING_CODE_UNMATCHED_CHANNEL_TERM = 6; // unmatched channel term + STREAMING_CODE_IGNORED_OPERATION = 7; // ignored operation + STREAMING_CODE_INNER = 8; // underlying service failure. + STREAMING_CODE_EOF = 9; // end of stream, generated by grpc status. + STREAMING_CODE_UNKNOWN = 999; // unknown error +} + +// StreamingError is the error type for log internal component. +message StreamingError { + StreamingCode code = 1; + string cause = 2; +} \ No newline at end of file diff --git a/internal/streamingnode/server/wal/adaptor/builder.go b/internal/streamingnode/server/wal/adaptor/builder.go index ca4cf7e2b7524..4dd186900619a 100644 --- a/internal/streamingnode/server/wal/adaptor/builder.go +++ b/internal/streamingnode/server/wal/adaptor/builder.go @@ -2,6 +2,8 @@ package adaptor import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick" "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) @@ -22,11 +24,12 @@ func (b builderAdaptorImpl) Name() string { } func (b builderAdaptorImpl) Build() (wal.Opener, error) { - _, err := b.builder.Build() + o, err := b.builder.Build() if err != nil { return nil, err } - return nil, nil - // TODO: wait for implementation. - // return adaptImplsToOpener(o), nil + // Add all interceptor here. + return adaptImplsToOpener(o, []interceptors.InterceptorBuilder{ + timetick.NewInterceptorBuilder(), + }), nil } diff --git a/internal/streamingnode/server/wal/adaptor/opener.go b/internal/streamingnode/server/wal/adaptor/opener.go new file mode 100644 index 0000000000000..95d3701b09494 --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/opener.go @@ -0,0 +1,86 @@ +package adaptor + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var _ wal.Opener = (*openerAdaptorImpl)(nil) + +// adaptImplsToOpener creates a new wal opener with opener impls. +func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.InterceptorBuilder) wal.Opener { + return &openerAdaptorImpl{ + lifetime: lifetime.NewLifetime(lifetime.Working), + opener: opener, + idAllocator: util.NewIDAllocator(), + walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](), + interceptorBuilders: builders, + } +} + +// openerAdaptorImpl is the wrapper of OpenerImpls to Opener. +type openerAdaptorImpl struct { + lifetime lifetime.Lifetime[lifetime.State] + opener walimpls.OpenerImpls + idAllocator *util.IDAllocator + walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator. + interceptorBuilders []interceptors.InterceptorBuilder +} + +// Open opens a wal instance for the channel. +func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) { + if o.lifetime.Add(lifetime.IsWorking) != nil { + return nil, status.NewOnShutdownError("wal opener is on shutdown") + } + defer o.lifetime.Done() + + id := o.idAllocator.Allocate() + log := log.With(zap.Any("channel", opt.Channel), zap.Int64("id", id)) + + l, err := o.opener.Open(ctx, &walimpls.OpenOption{ + Channel: opt.Channel, + }) + if err != nil { + log.Warn("open wal failed", zap.Error(err)) + return nil, err + } + + // wrap the wal into walExtend with cleanup function and interceptors. + wal := adaptImplsToWAL(l, o.interceptorBuilders, func() { + o.walInstances.Remove(id) + log.Info("wal deleted from allocator") + }) + + o.walInstances.Insert(id, wal) + log.Info("new wal created") + metrics.StreamingNodeWALTotal.WithLabelValues(paramtable.GetStringNodeID()).Inc() + return wal, nil +} + +// Close the wal opener, release the underlying resources. +func (o *openerAdaptorImpl) Close() { + o.lifetime.SetState(lifetime.Stopped) + o.lifetime.Wait() + o.lifetime.Close() + + // close all wal instances. + o.walInstances.Range(func(id int64, l wal.WAL) bool { + l.Close() + log.Info("close wal by opener", zap.Int64("id", id), zap.Any("channel", l.Channel())) + return true + }) + // close the opener + o.opener.Close() +} diff --git a/internal/streamingnode/server/wal/adaptor/opener_test.go b/internal/streamingnode/server/wal/adaptor/opener_test.go new file mode 100644 index 0000000000000..b525aeda7e24d --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/opener_test.go @@ -0,0 +1,117 @@ +package adaptor + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestMain(m *testing.M) { + paramtable.Init() + m.Run() +} + +func TestOpenerAdaptorFailure(t *testing.T) { + basicOpener := mock_walimpls.NewMockOpenerImpls(t) + errExpected := errors.New("test") + basicOpener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, boo *walimpls.OpenOption) (walimpls.WALImpls, error) { + return nil, errExpected + }) + + opener := adaptImplsToOpener(basicOpener, nil) + l, err := opener.Open(context.Background(), &wal.OpenOption{}) + assert.ErrorIs(t, err, errExpected) + assert.Nil(t, l) +} + +func TestOpenerAdaptor(t *testing.T) { + // Build basic opener. + basicOpener := mock_walimpls.NewMockOpenerImpls(t) + basicOpener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, boo *walimpls.OpenOption) (walimpls.WALImpls, error) { + wal := mock_walimpls.NewMockWALImpls(t) + + wal.EXPECT().Channel().Return(boo.Channel) + wal.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { + return nil, nil + }) + wal.EXPECT().Close().Run(func() {}) + return wal, nil + }) + + basicOpener.EXPECT().Close().Run(func() {}) + + // Create a opener with mock basic opener. + opener := adaptImplsToOpener(basicOpener, nil) + + // Test in concurrency env. + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + wal, err := opener.Open(context.Background(), &wal.OpenOption{ + Channel: types.PChannelInfo{ + Name: fmt.Sprintf("test_%d", i), + Term: int64(i), + ServerID: 1, + }, + }) + if err != nil { + assert.Nil(t, wal) + assertShutdownError(t, err) + return + } + assert.NotNil(t, wal) + + for { + msgID, err := wal.Append(context.Background(), nil) + time.Sleep(time.Millisecond * 10) + if err != nil { + assert.Nil(t, msgID) + assertShutdownError(t, err) + return + } + } + }(i) + } + time.Sleep(time.Second * 1) + opener.Close() + + // All wal should be closed with Opener. + ch := make(chan struct{}) + go func() { + wg.Wait() + close(ch) + }() + + select { + case <-time.After(time.Second * 3): + t.Errorf("opener close should be fast") + case <-ch: + } + + // open a wal after opener closed should return shutdown error. + _, err := opener.Open(context.Background(), &wal.OpenOption{ + Channel: types.PChannelInfo{ + Name: "test_after_close", + Term: int64(1), + ServerID: 1, + }, + }) + assertShutdownError(t, err) +} diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go new file mode 100644 index 0000000000000..c15996527b7ac --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -0,0 +1,121 @@ +package adaptor + +import ( + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +var _ wal.Scanner = (*scannerAdaptorImpl)(nil) + +// newScannerAdaptor creates a new scanner adaptor. +func newScannerAdaptor( + name string, + l walimpls.WALImpls, + readOption wal.ReadOption, + cleanup func(), +) wal.Scanner { + s := &scannerAdaptorImpl{ + innerWAL: l, + readOption: readOption, + sendingCh: make(chan message.ImmutableMessage, 1), + reorderBuffer: utility.NewReOrderBuffer(), + pendingQueue: utility.NewImmutableMessageQueue(), + cleanup: cleanup, + ScannerHelper: helper.NewScannerHelper(name), + } + go s.executeConsume() + return s +} + +// scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface. +type scannerAdaptorImpl struct { + *helper.ScannerHelper + innerWAL walimpls.WALImpls + readOption wal.ReadOption + sendingCh chan message.ImmutableMessage + reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now. + pendingQueue *utility.ImmutableMessageQueue // + cleanup func() +} + +// Chan returns the channel of message. +func (s *scannerAdaptorImpl) Chan() <-chan message.ImmutableMessage { + return s.sendingCh +} + +// Close the scanner, release the underlying resources. +// Return the error same with `Error` +func (s *scannerAdaptorImpl) Close() error { + err := s.ScannerHelper.Close() + if s.cleanup != nil { + s.cleanup() + } + return err +} + +func (s *scannerAdaptorImpl) executeConsume() { + defer close(s.sendingCh) + + innerScanner, err := s.innerWAL.Read(s.Context(), walimpls.ReadOption{ + Name: s.Name(), + DeliverPolicy: s.readOption.DeliverPolicy, + }) + if err != nil { + s.Finish(err) + return + } + defer innerScanner.Close() + + for { + // generate the event channel and do the event loop. + // TODO: Consume from local cache. + upstream, sending := s.getEventCh(innerScanner) + select { + case <-s.Context().Done(): + s.Finish(err) + return + case msg, ok := <-upstream: + if !ok { + s.Finish(innerScanner.Error()) + return + } + s.handleUpstream(msg) + case sending <- s.pendingQueue.Next(): + s.pendingQueue.UnsafeAdvance() + } + } +} + +func (s *scannerAdaptorImpl) getEventCh(scanner walimpls.ScannerImpls) (<-chan message.ImmutableMessage, chan<- message.ImmutableMessage) { + if s.pendingQueue.Len() == 0 { + // If pending queue is empty, + // no more message can be sent, + // we always need to recv message from upstream to avoid starve. + return scanner.Chan(), nil + } + // TODO: configurable pending count. + if s.pendingQueue.Len()+s.reorderBuffer.Len() > 1024 { + return nil, s.sendingCh + } + return scanner.Chan(), s.sendingCh +} + +func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { + if msg.MessageType() == message.MessageTypeTimeTick { + // If the time tick message incoming, + // the reorder buffer can be consumed into a pending queue with latest timetick. + + // TODO: !!! should we drop the unexpected broken timetick rule message. + s.pendingQueue.Add(s.reorderBuffer.PopUtilTimeTick(msg.TimeTick())) + return + } + // Filtering the message if needed. + if s.readOption.MessageFilter != nil && !s.readOption.MessageFilter(msg) { + return + } + // otherwise add message into reorder buffer directly. + s.reorderBuffer.Push(msg) +} diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go new file mode 100644 index 0000000000000..4b24fbbaf70cd --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go @@ -0,0 +1,29 @@ +package adaptor + +import ( + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" + "github.com/milvus-io/milvus/pkg/streaming/util/options" +) + +func TestScannerAdaptorReadError(t *testing.T) { + err := errors.New("read error") + l := mock_walimpls.NewMockWALImpls(t) + l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err) + + s := newScannerAdaptor("scanner", l, wal.ReadOption{ + DeliverPolicy: options.DeliverPolicyAll(), + MessageFilter: nil, + }, func() {}) + defer s.Close() + + <-s.Chan() + <-s.Done() + assert.ErrorIs(t, s.Error(), err) +} diff --git a/internal/streamingnode/server/wal/adaptor/scanner_registry.go b/internal/streamingnode/server/wal/adaptor/scanner_registry.go new file mode 100644 index 0000000000000..36bfe75bd932d --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/scanner_registry.go @@ -0,0 +1,31 @@ +package adaptor + +import ( + "fmt" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" + "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +type scannerRegistry struct { + channel types.PChannelInfo + idAllocator *util.IDAllocator +} + +// AllocateScannerName a scanner name for a scanner. +// The scanner name should be persistent on meta for garbage clean up. +func (m *scannerRegistry) AllocateScannerName() (string, error) { + name := m.newSubscriptionName() + // TODO: persistent the subscription name on meta. + return name, nil +} + +func (m *scannerRegistry) RegisterNewScanner(string, wal.Scanner) { +} + +// newSubscriptionName generates a new subscription name. +func (m *scannerRegistry) newSubscriptionName() string { + id := m.idAllocator.Allocate() + return fmt.Sprintf("%s/%d/%d", m.channel.Name, m.channel.Term, id) +} diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go new file mode 100644 index 0000000000000..978865fdc2ce9 --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -0,0 +1,152 @@ +package adaptor + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var _ wal.WAL = (*walAdaptorImpl)(nil) + +// adaptImplsToWAL creates a new wal from wal impls. +func adaptImplsToWAL( + basicWAL walimpls.WALImpls, + builders []interceptors.InterceptorBuilder, + cleanup func(), +) wal.WAL { + param := interceptors.InterceptorBuildParam{ + WALImpls: basicWAL, + WAL: syncutil.NewFuture[wal.WAL](), + } + interceptor := buildInterceptor(builders, param) + + wal := &walAdaptorImpl{ + lifetime: lifetime.NewLifetime(lifetime.Working), + idAllocator: util.NewIDAllocator(), + inner: basicWAL, + // TODO: make the pool size configurable. + appendExecutionPool: conc.NewPool[struct{}](10), + interceptor: interceptor, + scannerRegistry: scannerRegistry{ + channel: basicWAL.Channel(), + idAllocator: util.NewIDAllocator(), + }, + scanners: typeutil.NewConcurrentMap[int64, wal.Scanner](), + cleanup: cleanup, + } + param.WAL.Set(wal) + return wal +} + +// walAdaptorImpl is a wrapper of WALImpls to extend it into a WAL interface. +type walAdaptorImpl struct { + lifetime lifetime.Lifetime[lifetime.State] + idAllocator *util.IDAllocator + inner walimpls.WALImpls + appendExecutionPool *conc.Pool[struct{}] + interceptor interceptors.InterceptorWithReady + scannerRegistry scannerRegistry + scanners *typeutil.ConcurrentMap[int64, wal.Scanner] + cleanup func() +} + +// Channel returns the channel info of wal. +func (w *walAdaptorImpl) Channel() types.PChannelInfo { + return w.inner.Channel() +} + +// Append writes a record to the log. +func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + if w.lifetime.Add(lifetime.IsWorking) != nil { + return nil, status.NewOnShutdownError("wal is on shutdown") + } + defer w.lifetime.Done() + + // Check if interceptor is ready. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-w.interceptor.Ready(): + } + + // Execute the interceptor and wal append. + return w.interceptor.DoAppend(ctx, msg, w.inner.Append) +} + +// AppendAsync writes a record to the log asynchronously. +func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) { + if w.lifetime.Add(lifetime.IsWorking) != nil { + cb(nil, status.NewOnShutdownError("wal is on shutdown")) + return + } + + // Submit async append to a background execution pool. + _ = w.appendExecutionPool.Submit(func() (struct{}, error) { + defer w.lifetime.Done() + + msgID, err := w.inner.Append(ctx, msg) + cb(msgID, err) + return struct{}{}, nil + }) +} + +// Read returns a scanner for reading records from the wal. +func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Scanner, error) { + if w.lifetime.Add(lifetime.IsWorking) != nil { + return nil, status.NewOnShutdownError("wal is on shutdown") + } + defer w.lifetime.Done() + + name, err := w.scannerRegistry.AllocateScannerName() + if err != nil { + return nil, err + } + // wrap the scanner with cleanup function. + id := w.idAllocator.Allocate() + s := newScannerAdaptor(name, w.inner, opts, func() { + w.scanners.Remove(id) + }) + w.scanners.Insert(id, s) + return s, nil +} + +// Close overrides Scanner Close function. +func (w *walAdaptorImpl) Close() { + w.lifetime.SetState(lifetime.Stopped) + w.lifetime.Wait() + w.lifetime.Close() + + // close all wal instances. + w.scanners.Range(func(id int64, s wal.Scanner) bool { + s.Close() + log.Info("close scanner by wal extend", zap.Int64("id", id), zap.Any("channel", w.Channel())) + return true + }) + w.inner.Close() + w.interceptor.Close() + w.appendExecutionPool.Free() + w.cleanup() +} + +// newWALWithInterceptors creates a new wal with interceptors. +func buildInterceptor(builders []interceptors.InterceptorBuilder, param interceptors.InterceptorBuildParam) interceptors.InterceptorWithReady { + // Build all interceptors. + builtIterceptors := make([]interceptors.BasicInterceptor, 0, len(builders)) + for _, b := range builders { + builtIterceptors = append(builtIterceptors, b.Build(param)) + } + return interceptors.NewChainedInterceptor(builtIterceptors...) +} diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go new file mode 100644 index 0000000000000..83751d9ce2b94 --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go @@ -0,0 +1,163 @@ +package adaptor + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_interceptors" + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" +) + +func TestWalAdaptorReadFail(t *testing.T) { + l := mock_walimpls.NewMockWALImpls(t) + expectedErr := errors.New("test") + l.EXPECT().Channel().Return(types.PChannelInfo{}) + l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) { + return nil, expectedErr + }) + + lAdapted := adaptImplsToWAL(l, nil, func() {}) + scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{}) + assert.NoError(t, err) + assert.NotNil(t, scanner) + assert.ErrorIs(t, scanner.Error(), expectedErr) +} + +func TestWALAdaptor(t *testing.T) { + // Create a mock WAL implementation + l := mock_walimpls.NewMockWALImpls(t) + l.EXPECT().Channel().Return(types.PChannelInfo{}) + l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { + return nil, nil + }) + l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) { + scanner := mock_walimpls.NewMockScannerImpls(t) + ch := make(chan message.ImmutableMessage, 1) + scanner.EXPECT().Chan().Return(ch) + scanner.EXPECT().Close().RunAndReturn(func() error { + close(ch) + return nil + }) + return scanner, nil + }) + l.EXPECT().Close().Return() + + lAdapted := adaptImplsToWAL(l, nil, func() {}) + assert.NotNil(t, lAdapted.Channel()) + _, err := lAdapted.Append(context.Background(), nil) + assert.NoError(t, err) + lAdapted.AppendAsync(context.Background(), nil, func(mi message.MessageID, err error) { + assert.Nil(t, err) + }) + + // Test in concurrency env. + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{}) + if err != nil { + assertShutdownError(t, err) + return + } + assert.NoError(t, err) + <-scanner.Chan() + }(i) + } + time.Sleep(time.Second * 1) + lAdapted.Close() + + // All wal should be closed with Opener. + ch := make(chan struct{}) + go func() { + wg.Wait() + close(ch) + }() + + select { + case <-time.After(time.Second * 3): + t.Errorf("wal close should be fast") + case <-ch: + } + + _, err = lAdapted.Append(context.Background(), nil) + assertShutdownError(t, err) + lAdapted.AppendAsync(context.Background(), nil, func(mi message.MessageID, err error) { + assertShutdownError(t, err) + }) + _, err = lAdapted.Read(context.Background(), wal.ReadOption{}) + assertShutdownError(t, err) +} + +func assertShutdownError(t *testing.T, err error) { + e := status.AsStreamingError(err) + assert.Equal(t, e.Code, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN) +} + +func TestNoInterceptor(t *testing.T) { + l := mock_walimpls.NewMockWALImpls(t) + l.EXPECT().Channel().Return(types.PChannelInfo{}) + l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { + return nil, nil + }) + l.EXPECT().Close().Run(func() {}) + + lWithInterceptors := adaptImplsToWAL(l, nil, func() {}) + + _, err := lWithInterceptors.Append(context.Background(), nil) + assert.NoError(t, err) + lWithInterceptors.Close() +} + +func TestWALWithInterceptor(t *testing.T) { + l := mock_walimpls.NewMockWALImpls(t) + l.EXPECT().Channel().Return(types.PChannelInfo{}) + l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { + return nil, nil + }) + l.EXPECT().Close().Run(func() {}) + + b := mock_interceptors.NewMockInterceptorBuilder(t) + readyCh := make(chan struct{}) + b.EXPECT().Build(mock.Anything).RunAndReturn(func(ibp interceptors.InterceptorBuildParam) interceptors.BasicInterceptor { + interceptor := mock_interceptors.NewMockInterceptorWithReady(t) + interceptor.EXPECT().Ready().Return(readyCh) + interceptor.EXPECT().DoAppend(mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, mm message.MutableMessage, f func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) { + return f(ctx, mm) + }) + interceptor.EXPECT().Close().Run(func() {}) + return interceptor + }) + lWithInterceptors := adaptImplsToWAL(l, []interceptors.InterceptorBuilder{b}, func() {}) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + // Interceptor is not ready, so the append/read will be blocked until timeout. + _, err := lWithInterceptors.Append(ctx, nil) + assert.ErrorIs(t, err, context.DeadlineExceeded) + + // Interceptor is ready, so the append/read will return soon. + close(readyCh) + _, err = lWithInterceptors.Append(context.Background(), nil) + assert.NoError(t, err) + + lWithInterceptors.Close() +} diff --git a/internal/streamingnode/server/wal/adaptor/wal_test.go b/internal/streamingnode/server/wal/adaptor/wal_test.go new file mode 100644 index 0000000000000..c99b4bc2d962a --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/wal_test.go @@ -0,0 +1,339 @@ +package adaptor_test + +import ( + "context" + "fmt" + "math/rand" + "sort" + "strconv" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/remeh/sizedwaitgroup" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" +) + +type walTestFramework struct { + b wal.OpenerBuilder + t *testing.T + messageCount int +} + +func TestWAL(t *testing.T) { + rc := timestamp.NewMockRootCoordClient(t) + resource.InitForTest(resource.OptRootCoordClient(rc)) + + b := registry.MustGetBuilder(walimplstest.WALName) + f := &walTestFramework{ + b: b, + t: t, + messageCount: 1000, + } + f.Run() +} + +func (f *walTestFramework) Run() { + wg := sync.WaitGroup{} + loopCnt := 3 + wg.Add(loopCnt) + o, err := f.b.Build() + assert.NoError(f.t, err) + assert.NotNil(f.t, o) + defer o.Close() + + for i := 0; i < loopCnt; i++ { + go func(i int) { + defer wg.Done() + f.runOnce(fmt.Sprintf("pchannel-%d", i), o) + }(i) + } + wg.Wait() +} + +func (f *walTestFramework) runOnce(pchannel string, o wal.Opener) { + f2 := &testOneWALFramework{ + t: f.t, + opener: o, + pchannel: pchannel, + messageCount: f.messageCount, + term: 1, + } + f2.Run() +} + +type testOneWALFramework struct { + t *testing.T + opener wal.Opener + written []message.ImmutableMessage + pchannel string + messageCount int + term int +} + +func (f *testOneWALFramework) Run() { + ctx := context.Background() + for ; f.term <= 3; f.term++ { + pChannel := types.PChannelInfo{ + Name: f.pchannel, + Term: int64(f.term), + ServerID: 1, + } + w, err := f.opener.Open(ctx, &wal.OpenOption{ + Channel: pChannel, + }) + assert.NoError(f.t, err) + assert.NotNil(f.t, w) + assert.Equal(f.t, pChannel.Name, w.Channel().Name) + assert.Equal(f.t, pChannel.ServerID, w.Channel().ServerID) + + f.testReadAndWrite(ctx, w) + // close the wal + w.Close() + } +} + +func (f *testOneWALFramework) testReadAndWrite(ctx context.Context, w wal.WAL) { + // Test read and write. + wg := sync.WaitGroup{} + wg.Add(3) + + var newWritten []message.ImmutableMessage + var read1, read2 []message.ImmutableMessage + go func() { + defer wg.Done() + var err error + newWritten, err = f.testAppend(ctx, w) + assert.NoError(f.t, err) + }() + go func() { + defer wg.Done() + var err error + read1, err = f.testRead(ctx, w) + assert.NoError(f.t, err) + }() + go func() { + defer wg.Done() + var err error + read2, err = f.testRead(ctx, w) + assert.NoError(f.t, err) + }() + wg.Wait() + // read result should be sorted by timetick. + f.assertSortByTimeTickMessageList(read1) + f.assertSortByTimeTickMessageList(read2) + + // all written messages should be read. + sort.Sort(sortByMessageID(newWritten)) + f.written = append(f.written, newWritten...) + sort.Sort(sortByMessageID(read1)) + sort.Sort(sortByMessageID(read2)) + f.assertEqualMessageList(f.written, read1) + f.assertEqualMessageList(f.written, read2) + + // test read with option + f.testReadWithOption(ctx, w) +} + +func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]message.ImmutableMessage, error) { + messages := make([]message.ImmutableMessage, f.messageCount) + swg := sizedwaitgroup.New(10) + for i := 0; i < f.messageCount-1; i++ { + swg.Add() + go func(i int) { + defer swg.Done() + time.Sleep(time.Duration(5+rand.Int31n(10)) * time.Millisecond) + // ...rocksmq has a dirty implement of properties, + // without commonpb.MsgHeader, it can not work. + header := commonpb.MsgHeader{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + MsgID: int64(i), + }, + } + payload, err := proto.Marshal(&header) + if err != nil { + panic(err) + } + properties := map[string]string{ + "id": fmt.Sprintf("%d", i), + "const": "t", + } + typ := message.MessageTypeUnknown + msg := message.NewMutableMessageBuilder(). + WithMessageType(typ). + WithPayload(payload). + WithProperties(properties). + BuildMutable() + id, err := w.Append(ctx, msg) + assert.NoError(f.t, err) + assert.NotNil(f.t, id) + messages[i] = msg.IntoImmutableMessage(id) + }(i) + } + swg.Wait() + // send a final hint message + header := commonpb.MsgHeader{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + MsgID: int64(f.messageCount - 1), + }, + } + payload, err := proto.Marshal(&header) + if err != nil { + panic(err) + } + properties := map[string]string{ + "id": fmt.Sprintf("%d", f.messageCount-1), + "const": "t", + "term": strconv.FormatInt(int64(f.term), 10), + } + msg := message.NewMutableMessageBuilder(). + WithPayload(payload). + WithProperties(properties). + WithMessageType(message.MessageTypeUnknown). + BuildMutable() + id, err := w.Append(ctx, msg) + assert.NoError(f.t, err) + messages[f.messageCount-1] = msg.IntoImmutableMessage(id) + return messages, nil +} + +func (f *testOneWALFramework) testRead(ctx context.Context, w wal.WAL) ([]message.ImmutableMessage, error) { + s, err := w.Read(ctx, wal.ReadOption{ + DeliverPolicy: options.DeliverPolicyAll(), + }) + assert.NoError(f.t, err) + defer s.Close() + + expectedCnt := f.messageCount + len(f.written) + msgs := make([]message.ImmutableMessage, 0, expectedCnt) + for { + msg, ok := <-s.Chan() + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + msgs = append(msgs, msg) + termString, ok := msg.Properties().Get("term") + if !ok { + continue + } + term, err := strconv.ParseInt(termString, 10, 64) + if err != nil { + panic(err) + } + if int(term) == f.term { + break + } + } + return msgs, nil +} + +func (f *testOneWALFramework) testReadWithOption(ctx context.Context, w wal.WAL) { + loopCount := 5 + wg := sync.WaitGroup{} + wg.Add(loopCount) + for i := 0; i < loopCount; i++ { + go func() { + defer wg.Done() + idx := rand.Int31n(int32(len(f.written))) + // Test other read options. + // Test start from some message and timetick is gte than it. + readFromMsg := f.written[idx] + s, err := w.Read(ctx, wal.ReadOption{ + DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsg.LastConfirmedMessageID()), + MessageFilter: func(im message.ImmutableMessage) bool { + return im.TimeTick() >= readFromMsg.TimeTick() + }, + }) + assert.NoError(f.t, err) + maxTimeTick := f.maxTimeTickWritten() + msgCount := 0 + lastTimeTick := readFromMsg.TimeTick() - 1 + for { + msg, ok := <-s.Chan() + msgCount++ + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + assert.Greater(f.t, msg.TimeTick(), lastTimeTick) + lastTimeTick = msg.TimeTick() + if msg.TimeTick() >= maxTimeTick { + break + } + } + + // shouldn't lost any message. + assert.Equal(f.t, f.countTheTimeTick(readFromMsg.TimeTick()), msgCount) + s.Close() + }() + } + wg.Wait() +} + +func (f *testOneWALFramework) assertSortByTimeTickMessageList(msgs []message.ImmutableMessage) { + for i := 1; i < len(msgs); i++ { + assert.Less(f.t, msgs[i-1].TimeTick(), msgs[i].TimeTick()) + } +} + +func (f *testOneWALFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) { + assert.Equal(f.t, len(msgs2), len(msgs1)) + for i := 0; i < len(msgs1); i++ { + assert.True(f.t, msgs1[i].MessageID().EQ(msgs2[i].MessageID())) + // assert.True(f.t, bytes.Equal(msgs1[i].Payload(), msgs2[i].Payload())) + id1, ok1 := msgs1[i].Properties().Get("id") + id2, ok2 := msgs2[i].Properties().Get("id") + assert.True(f.t, ok1) + assert.True(f.t, ok2) + assert.Equal(f.t, id1, id2) + id1, ok1 = msgs1[i].Properties().Get("const") + id2, ok2 = msgs2[i].Properties().Get("const") + assert.True(f.t, ok1) + assert.True(f.t, ok2) + assert.Equal(f.t, id1, id2) + } +} + +func (f *testOneWALFramework) countTheTimeTick(begin uint64) int { + cnt := 0 + for _, m := range f.written { + if m.TimeTick() >= begin { + cnt++ + } + } + return cnt +} + +func (f *testOneWALFramework) maxTimeTickWritten() uint64 { + maxTimeTick := uint64(0) + for _, m := range f.written { + if m.TimeTick() > maxTimeTick { + maxTimeTick = m.TimeTick() + } + } + return maxTimeTick +} + +type sortByMessageID []message.ImmutableMessage + +func (a sortByMessageID) Len() int { + return len(a) +} + +func (a sortByMessageID) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a sortByMessageID) Less(i, j int) bool { + return a[i].MessageID().LT(a[j].MessageID()) +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go index 475e3844dc8a4..e3a2cbccd0806 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) var _ interceptors.AppendInterceptor = (*timeTickAppendInterceptor)(nil) @@ -65,6 +66,37 @@ func (impl *timeTickAppendInterceptor) executeSyncTimeTick(interval time.Duratio logger.Info("start to sync time tick...") defer logger.Info("sync time tick stopped") + if err := impl.blockUntilSyncTimeTickReady(underlyingWALImpls); err != nil { + logger.Warn("sync first time tick failed", zap.Error(err)) + return + } + + // interceptor is ready, wait for the final wal object is ready to use. + wal := param.WAL.Get() + + // TODO: sync time tick message to wal periodically. + // Add a trigger on `AckManager` to sync time tick message without periodically. + // `AckManager` gather detail information, time tick sync can check it and make the message between tt more smaller. + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-impl.ctx.Done(): + return + case <-ticker.C: + if err := impl.sendTsMsg(impl.ctx, wal.Append); err != nil { + log.Warn("send time tick sync message failed", zap.Error(err)) + } + } + } +} + +// blockUntilSyncTimeTickReady blocks until the first time tick message is sent. +func (impl *timeTickAppendInterceptor) blockUntilSyncTimeTickReady(underlyingWALImpls walimpls.WALImpls) error { + logger := log.With(zap.Any("channel", underlyingWALImpls.Channel())) + logger.Info("start to sync first time tick") + defer logger.Info("sync first time tick done") + // Send first timetick message to wal before interceptor is ready. for count := 0; ; count++ { // Sent first timetick message to wal before ready. @@ -75,40 +107,20 @@ func (impl *timeTickAppendInterceptor) executeSyncTimeTick(interval time.Duratio // !!! Send a timetick message into walimpls directly is safe. select { case <-impl.ctx.Done(): - return + return impl.ctx.Err() default: } if err := impl.sendTsMsg(impl.ctx, underlyingWALImpls.Append); err != nil { - log.Warn("send first timestamp message failed", zap.Error(err), zap.Int("retryCount", count)) + logger.Warn("send first timestamp message failed", zap.Error(err), zap.Int("retryCount", count)) // TODO: exponential backoff. time.Sleep(50 * time.Millisecond) continue } break } - // interceptor is ready now. close(impl.ready) - logger.Info("start to sync time ready") - - // interceptor is ready, wait for the final wal object is ready to use. - wal := param.WAL.Get() - - // TODO: sync time tick message to wal periodically. - // Add a trigger on `AckManager` to sync time tick message without periodically. - // `AckManager` gather detail information, time tick sync can check it and make the message between tt more smaller. - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-impl.ctx.Done(): - return - case <-ticker.C: - if err := impl.sendTsMsg(impl.ctx, wal.Append); err != nil { - log.Warn("send time tick sync message failed", zap.Error(err)) - } - } - } + return nil } // syncAcknowledgedDetails syncs the timestamp acknowledged details. diff --git a/internal/streamingnode/server/wal/scanner.go b/internal/streamingnode/server/wal/scanner.go index 64545cbde981e..27b71604f2752 100644 --- a/internal/streamingnode/server/wal/scanner.go +++ b/internal/streamingnode/server/wal/scanner.go @@ -5,10 +5,12 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/options" ) +type MessageFilter = func(message.ImmutableMessage) bool + // ReadOption is the option for reading records from the wal. type ReadOption struct { DeliverPolicy options.DeliverPolicy - DeliverOrder options.DeliverOrder + MessageFilter MessageFilter } // Scanner is the interface for reading records from the wal. diff --git a/internal/streamingnode/server/wal/utility/immutable_message_queue.go b/internal/streamingnode/server/wal/utility/immutable_message_queue.go new file mode 100644 index 0000000000000..75e72bc8a0dcd --- /dev/null +++ b/internal/streamingnode/server/wal/utility/immutable_message_queue.go @@ -0,0 +1,51 @@ +package utility + +import "github.com/milvus-io/milvus/pkg/streaming/util/message" + +// NewImmutableMessageQueue create a new immutable message queue. +func NewImmutableMessageQueue() *ImmutableMessageQueue { + return &ImmutableMessageQueue{ + pendings: make([][]message.ImmutableMessage, 0), + cnt: 0, + } +} + +// ImmutableMessageQueue is a queue of messages. +type ImmutableMessageQueue struct { + pendings [][]message.ImmutableMessage + cnt int +} + +// Len return the queue size. +func (pq *ImmutableMessageQueue) Len() int { + return pq.cnt +} + +// Add add a slice of message as pending one +func (pq *ImmutableMessageQueue) Add(msgs []message.ImmutableMessage) { + if len(msgs) == 0 { + return + } + pq.pendings = append(pq.pendings, msgs) + pq.cnt += len(msgs) +} + +// Next return the next message in pending queue. +func (pq *ImmutableMessageQueue) Next() message.ImmutableMessage { + if len(pq.pendings) != 0 && len(pq.pendings[0]) != 0 { + return pq.pendings[0][0] + } + return nil +} + +// UnsafeAdvance do a advance without check. +// !!! Should only be called `Next` do not return nil. +func (pq *ImmutableMessageQueue) UnsafeAdvance() { + if len(pq.pendings[0]) == 1 { + pq.pendings = pq.pendings[1:] + pq.cnt-- + return + } + pq.pendings[0] = pq.pendings[0][1:] + pq.cnt-- +} diff --git a/internal/streamingnode/server/wal/utility/immutable_message_queue_test.go b/internal/streamingnode/server/wal/utility/immutable_message_queue_test.go new file mode 100644 index 0000000000000..270b048b15974 --- /dev/null +++ b/internal/streamingnode/server/wal/utility/immutable_message_queue_test.go @@ -0,0 +1,25 @@ +package utility + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +func TestImmutableMessageQueue(t *testing.T) { + q := NewImmutableMessageQueue() + for i := 0; i < 100; i++ { + q.Add([]message.ImmutableMessage{ + mock_message.NewMockImmutableMessage(t), + }) + assert.Equal(t, i+1, q.Len()) + } + for i := 100; i > 0; i-- { + assert.NotNil(t, q.Next()) + q.UnsafeAdvance() + assert.Equal(t, i-1, q.Len()) + } +} diff --git a/internal/streamingnode/server/wal/utility/message_heap.go b/internal/streamingnode/server/wal/utility/message_heap.go new file mode 100644 index 0000000000000..27c57f20eacc5 --- /dev/null +++ b/internal/streamingnode/server/wal/utility/message_heap.go @@ -0,0 +1,45 @@ +package utility + +import ( + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var _ typeutil.HeapInterface = (*immutableMessageHeap)(nil) + +// immutableMessageHeap is a heap underlying represent of timestampAck. +type immutableMessageHeap []message.ImmutableMessage + +// Len returns the length of the heap. +func (h immutableMessageHeap) Len() int { + return len(h) +} + +// Less returns true if the element at index i is less than the element at index j. +func (h immutableMessageHeap) Less(i, j int) bool { + return h[i].TimeTick() < h[j].TimeTick() +} + +// Swap swaps the elements at indexes i and j. +func (h immutableMessageHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +// Push pushes the last one at len. +func (h *immutableMessageHeap) Push(x interface{}) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(message.ImmutableMessage)) +} + +// Pop pop the last one at len. +func (h *immutableMessageHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// Peek returns the element at the top of the heap. +func (h *immutableMessageHeap) Peek() interface{} { + return (*h)[0] +} diff --git a/internal/streamingnode/server/wal/utility/message_heap_test.go b/internal/streamingnode/server/wal/utility/message_heap_test.go new file mode 100644 index 0000000000000..22c63d852e20a --- /dev/null +++ b/internal/streamingnode/server/wal/utility/message_heap_test.go @@ -0,0 +1,29 @@ +package utility + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestImmutableMessageHeap(t *testing.T) { + h := typeutil.NewHeap[message.ImmutableMessage](&immutableMessageHeap{}) + timeticks := rand.Perm(25) + for _, timetick := range timeticks { + msg := mock_message.NewMockImmutableMessage(t) + msg.EXPECT().TimeTick().Return(uint64(timetick + 1)) + h.Push(msg) + } + + lastOneTimeTick := uint64(0) + for h.Len() != 0 { + msg := h.Pop() + assert.Greater(t, msg.TimeTick(), lastOneTimeTick) + lastOneTimeTick = msg.TimeTick() + } +} diff --git a/internal/streamingnode/server/wal/utility/reorder_buffer.go b/internal/streamingnode/server/wal/utility/reorder_buffer.go new file mode 100644 index 0000000000000..e45a335c9cd1e --- /dev/null +++ b/internal/streamingnode/server/wal/utility/reorder_buffer.go @@ -0,0 +1,38 @@ +package utility + +import ( + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick. +type ReOrderByTimeTickBuffer struct { + messageHeap typeutil.Heap[message.ImmutableMessage] +} + +// NewReOrderBuffer creates a new ReOrderBuffer. +func NewReOrderBuffer() *ReOrderByTimeTickBuffer { + return &ReOrderByTimeTickBuffer{ + messageHeap: typeutil.NewHeap[message.ImmutableMessage](&immutableMessageHeap{}), + } +} + +// Push pushes a message into the buffer. +func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) { + r.messageHeap.Push(msg) +} + +// PopUtilTimeTick pops all messages whose time tick is less than or equal to the given time tick. +// The result is sorted by time tick in ascending order. +func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.ImmutableMessage { + var res []message.ImmutableMessage + for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick { + res = append(res, r.messageHeap.Pop()) + } + return res +} + +// Len returns the number of messages in the buffer. +func (r *ReOrderByTimeTickBuffer) Len() int { + return r.messageHeap.Len() +} diff --git a/internal/streamingnode/server/wal/utility/reorder_buffer_test.go b/internal/streamingnode/server/wal/utility/reorder_buffer_test.go new file mode 100644 index 0000000000000..8f3b1fa53cc34 --- /dev/null +++ b/internal/streamingnode/server/wal/utility/reorder_buffer_test.go @@ -0,0 +1,43 @@ +package utility + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" +) + +func TestReOrderByTimeTickBuffer(t *testing.T) { + buf := NewReOrderBuffer() + timeticks := rand.Perm(25) + for i, timetick := range timeticks { + msg := mock_message.NewMockImmutableMessage(t) + msg.EXPECT().TimeTick().Return(uint64(timetick + 1)) + buf.Push(msg) + assert.Equal(t, i+1, buf.Len()) + } + + result := buf.PopUtilTimeTick(0) + assert.Len(t, result, 0) + result = buf.PopUtilTimeTick(1) + assert.Len(t, result, 1) + for _, msg := range result { + assert.LessOrEqual(t, msg.TimeTick(), uint64(1)) + } + + result = buf.PopUtilTimeTick(10) + assert.Len(t, result, 9) + for _, msg := range result { + assert.LessOrEqual(t, msg.TimeTick(), uint64(10)) + assert.Greater(t, msg.TimeTick(), uint64(1)) + } + + result = buf.PopUtilTimeTick(25) + assert.Len(t, result, 15) + for _, msg := range result { + assert.Greater(t, msg.TimeTick(), uint64(10)) + assert.LessOrEqual(t, msg.TimeTick(), uint64(25)) + } +} diff --git a/internal/streamingservice/.mockery.yaml b/internal/streamingservice/.mockery.yaml index feff9bf14fa7f..55423e9140f87 100644 --- a/internal/streamingservice/.mockery.yaml +++ b/internal/streamingservice/.mockery.yaml @@ -10,4 +10,12 @@ packages: OpenerBuilder: Opener: Scanner: - WAL: \ No newline at end of file + WAL: + github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors: + interfaces: + Interceptor: + InterceptorWithReady: + InterceptorBuilder: + google.golang.org/grpc: + interfaces: + ClientStream: diff --git a/internal/util/streamingutil/status/checker.go b/internal/util/streamingutil/status/checker.go new file mode 100644 index 0000000000000..b6813f34ed387 --- /dev/null +++ b/internal/util/streamingutil/status/checker.go @@ -0,0 +1,47 @@ +package status + +import ( + "context" + + "github.com/cockroachdb/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Check if the error is canceled. +// Used in client side. +func IsCanceled(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + if errors.Is(err, context.Canceled) { + return true + } + + if se, ok := err.(interface { + GRPCStatus() *status.Status + }); ok { + switch se.GRPCStatus().Code() { + case codes.Canceled, codes.DeadlineExceeded: + return true + // It may be a special unavailable error, but we don't enable here. + // From etcd implementation: + // case codes.Unavailable: + // msg := se.GRPCStatus().Message() + // // client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected") + // // "rpc error: code = Unavailable desc = client disconnected" + // if msg == "client disconnected" { + // return true + // } + // // "grpc/transport.ClientTransport.CloseStream" on canceled streams + // // "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL") + // if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") { + // return true + // } + } + } + return false +} diff --git a/internal/util/streamingutil/status/checker_test.go b/internal/util/streamingutil/status/checker_test.go new file mode 100644 index 0000000000000..40cb3787a7bd8 --- /dev/null +++ b/internal/util/streamingutil/status/checker_test.go @@ -0,0 +1,19 @@ +package status + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestIsCanceled(t *testing.T) { + assert.False(t, IsCanceled(nil)) + assert.True(t, IsCanceled(context.DeadlineExceeded)) + assert.True(t, IsCanceled(context.Canceled)) + assert.True(t, IsCanceled(status.Error(codes.Canceled, "test"))) + assert.True(t, IsCanceled(ConvertStreamingError("test", status.Error(codes.Canceled, "test")))) + assert.False(t, IsCanceled(ConvertStreamingError("test", status.Error(codes.Unknown, "test")))) +} diff --git a/internal/util/streamingutil/status/client_stream_wrapper.go b/internal/util/streamingutil/status/client_stream_wrapper.go new file mode 100644 index 0000000000000..277c5a125df19 --- /dev/null +++ b/internal/util/streamingutil/status/client_stream_wrapper.go @@ -0,0 +1,34 @@ +package status + +import ( + "google.golang.org/grpc" +) + +// NewClientStreamWrapper returns a grpc.ClientStream that wraps the given stream. +func NewClientStreamWrapper(method string, stream grpc.ClientStream) grpc.ClientStream { + if stream == nil { + return nil + } + return &clientStreamWrapper{ + method: method, + ClientStream: stream, + } +} + +// clientStreamWrapper wraps a grpc.ClientStream and converts errors to Status. +type clientStreamWrapper struct { + method string + grpc.ClientStream +} + +// Convert the error to a Status and return it. +func (s *clientStreamWrapper) SendMsg(m interface{}) error { + err := s.ClientStream.SendMsg(m) + return ConvertStreamingError(s.method, err) +} + +// Convert the error to a Status and return it. +func (s *clientStreamWrapper) RecvMsg(m interface{}) error { + err := s.ClientStream.RecvMsg(m) + return ConvertStreamingError(s.method, err) +} diff --git a/internal/util/streamingutil/status/client_stream_wrapper_test.go b/internal/util/streamingutil/status/client_stream_wrapper_test.go new file mode 100644 index 0000000000000..df53362787b97 --- /dev/null +++ b/internal/util/streamingutil/status/client_stream_wrapper_test.go @@ -0,0 +1,33 @@ +package status + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/google.golang.org/mock_grpc" + "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +func TestClientStreamWrapper(t *testing.T) { + s := mock_grpc.NewMockClientStream(t) + s.EXPECT().SendMsg(mock.Anything).Return(NewGRPCStatusFromStreamingError(NewOnShutdownError("test")).Err()) + s.EXPECT().RecvMsg(mock.Anything).Return(NewGRPCStatusFromStreamingError(NewOnShutdownError("test")).Err()) + w := NewClientStreamWrapper("method", s) + + err := w.SendMsg(context.Background()) + assert.NotNil(t, err) + streamingErr := AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, streamingErr.Code) + assert.Contains(t, streamingErr.Cause, "test") + + err = w.RecvMsg(context.Background()) + assert.NotNil(t, err) + streamingErr = AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, streamingErr.Code) + assert.Contains(t, streamingErr.Cause, "test") + + assert.Nil(t, NewClientStreamWrapper("method", nil)) +} diff --git a/internal/util/streamingutil/status/rpc_error.go b/internal/util/streamingutil/status/rpc_error.go new file mode 100644 index 0000000000000..c2ce2c11284c3 --- /dev/null +++ b/internal/util/streamingutil/status/rpc_error.go @@ -0,0 +1,101 @@ +package status + +import ( + "context" + "fmt" + "io" + + "github.com/cockroachdb/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +var streamingErrorToGRPCStatus = map[streamingpb.StreamingCode]codes.Code{ + streamingpb.StreamingCode_STREAMING_CODE_OK: codes.OK, + streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_EXIST: codes.AlreadyExists, + streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST: codes.FailedPrecondition, + streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_FENCED: codes.FailedPrecondition, + streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN: codes.FailedPrecondition, + streamingpb.StreamingCode_STREAMING_CODE_INVALID_REQUEST_SEQ: codes.FailedPrecondition, + streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM: codes.FailedPrecondition, + streamingpb.StreamingCode_STREAMING_CODE_IGNORED_OPERATION: codes.FailedPrecondition, + streamingpb.StreamingCode_STREAMING_CODE_INNER: codes.Unavailable, + streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN: codes.Unknown, +} + +// NewGRPCStatusFromStreamingError converts StreamingError to grpc status. +// Should be called at server-side. +func NewGRPCStatusFromStreamingError(e *StreamingError) *status.Status { + if e == nil || e.Code == streamingpb.StreamingCode_STREAMING_CODE_OK { + return status.New(codes.OK, "") + } + + code, ok := streamingErrorToGRPCStatus[e.Code] + if !ok { + code = codes.Unknown + } + + // Attach streaming error to detail. + st := status.New(code, "") + newST, err := st.WithDetails(e.AsPBError()) + if err != nil { + return status.New(code, fmt.Sprintf("convert streaming error failed, detail: %s", e.Cause)) + } + return newST +} + +// StreamingClientStatus is a wrapper of grpc status. +// Should be used in client side. +type StreamingClientStatus struct { + *status.Status + method string +} + +// ConvertStreamingError convert error to StreamingStatus. +// Used in client side. +func ConvertStreamingError(method string, err error) error { + if err == nil { + return nil + } + if errors.IsAny(err, context.DeadlineExceeded, context.Canceled, io.EOF) { + return err + } + rpcStatus := status.Convert(err) + e := &StreamingClientStatus{ + Status: rpcStatus, + method: method, + } + return e +} + +// TryIntoStreamingError try to convert StreamingStatus to StreamingError. +func (s *StreamingClientStatus) TryIntoStreamingError() *StreamingError { + if s == nil { + return nil + } + for _, detail := range s.Details() { + if detail, ok := detail.(*streamingpb.StreamingError); ok { + return New(detail.Code, detail.Cause) + } + } + return nil +} + +// For converting with status.Status. +// !!! DO NOT Delete this method. IsCanceled function use it. +func (s *StreamingClientStatus) GRPCStatus() *status.Status { + if s == nil { + return nil + } + return s.Status +} + +// Error implements StreamingStatus as error. +func (s *StreamingClientStatus) Error() string { + if streamingErr := s.TryIntoStreamingError(); streamingErr != nil { + return fmt.Sprintf("%s; streaming error: code = %s, cause = %s; rpc error: code = %s, desc = %s", s.method, streamingErr.Code.String(), streamingErr.Cause, s.Code(), s.Message()) + } + return fmt.Sprintf("%s; rpc error: code = %s, desc = %s", s.method, s.Code(), s.Message()) +} diff --git a/internal/util/streamingutil/status/rpc_error_test.go b/internal/util/streamingutil/status/rpc_error_test.go new file mode 100644 index 0000000000000..2442c71caa346 --- /dev/null +++ b/internal/util/streamingutil/status/rpc_error_test.go @@ -0,0 +1,48 @@ +package status + +import ( + "context" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +func TestStreamingStatus(t *testing.T) { + err := ConvertStreamingError("test", nil) + assert.Nil(t, err) + err = ConvertStreamingError("test", errors.Wrap(context.DeadlineExceeded, "test")) + assert.NotNil(t, err) + assert.ErrorIs(t, err, context.DeadlineExceeded) + + err = ConvertStreamingError("test", errors.New("test")) + assert.NotNil(t, err) + streamingErr := AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN, streamingErr.Code) + assert.Contains(t, streamingErr.Cause, "test; rpc error: code = Unknown, desc = test") + + err = ConvertStreamingError("test", NewGRPCStatusFromStreamingError(NewOnShutdownError("test")).Err()) + assert.NotNil(t, err) + streamingErr = AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, streamingErr.Code) + assert.Contains(t, streamingErr.Cause, "test") + assert.Contains(t, err.Error(), "streaming error") +} + +func TestNewGRPCStatusFromStreamingError(t *testing.T) { + st := NewGRPCStatusFromStreamingError(nil) + assert.Equal(t, codes.OK, st.Code()) + + st = NewGRPCStatusFromStreamingError( + NewOnShutdownError("test"), + ) + assert.Equal(t, codes.FailedPrecondition, st.Code()) + + st = NewGRPCStatusFromStreamingError( + New(10086, "test"), + ) + assert.Equal(t, codes.Unknown, st.Code()) +} diff --git a/internal/util/streamingutil/status/streaming_error.go b/internal/util/streamingutil/status/streaming_error.go new file mode 100644 index 0000000000000..a27800d35a3ee --- /dev/null +++ b/internal/util/streamingutil/status/streaming_error.go @@ -0,0 +1,119 @@ +package status + +import ( + "fmt" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +var _ error = (*StreamingError)(nil) + +// StreamingError is the error type for streaming internal module. +// Should be used at logic layer. +type ( + StreamingError streamingpb.StreamingError + StreamingCode streamingpb.StreamingCode +) + +// Error implements StreamingError as error. +func (e *StreamingError) Error() string { + return fmt.Sprintf("code: %s, cause: %s", e.Code.String(), e.Cause) +} + +// AsPBError convert StreamingError to streamingpb.StreamingError. +func (e *StreamingError) AsPBError() *streamingpb.StreamingError { + return (*streamingpb.StreamingError)(e) +} + +// IsWrongStreamingNode returns true if the error is caused by wrong streamingnode. +// Client should report these error to coord and block until new assignment term coming. +func (e *StreamingError) IsWrongStreamingNode() bool { + return e.Code == streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM || // channel term not match + e.Code == streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST || // channel do not exist on streamingnode + e.Code == streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_FENCED // channel fenced on these node. +} + +// NewOnShutdownError creates a new StreamingError with code STREAMING_CODE_ON_SHUTDOWN. +func NewOnShutdownError(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, format, args...) +} + +// NewUnknownError creates a new StreamingError with code STREAMING_CODE_UNKNOWN. +func NewUnknownError(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN, format, args...) +} + +// NewInvalidRequestSeq creates a new StreamingError with code STREAMING_CODE_INVALID_REQUEST_SEQ. +func NewInvalidRequestSeq(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_INVALID_REQUEST_SEQ, format, args...) +} + +// NewChannelExist creates a new StreamingError with code StreamingCode_STREAMING_CODE_CHANNEL_EXIST. +func NewChannelExist(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_EXIST, format, args...) +} + +// NewChannelNotExist creates a new StreamingError with code STREAMING_CODE_CHANNEL_NOT_EXIST. +func NewChannelNotExist(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST, format, args...) +} + +// NewUnmatchedChannelTerm creates a new StreamingError with code StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM. +func NewUnmatchedChannelTerm(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM, format, args...) +} + +// NewIgnoreOperation creates a new StreamingError with code STREAMING_CODE_IGNORED_OPERATION. +func NewIgnoreOperation(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_IGNORED_OPERATION, format, args...) +} + +// NewInner creates a new StreamingError with code STREAMING_CODE_INNER. +func NewInner(format string, args ...interface{}) *StreamingError { + return New(streamingpb.StreamingCode_STREAMING_CODE_INNER, format, args...) +} + +// New creates a new StreamingError with the given code and cause. +func New(code streamingpb.StreamingCode, format string, args ...interface{}) *StreamingError { + if len(args) == 0 { + return &StreamingError{ + Code: code, + Cause: format, + } + } + return &StreamingError{ + Code: code, + Cause: redact.Sprintf(format, args...).StripMarkers(), + } +} + +// As implements StreamingError as error. +func AsStreamingError(err error) *StreamingError { + if err == nil { + return nil + } + + // If the error is a StreamingError, return it directly. + var e *StreamingError + if errors.As(err, &e) { + return e + } + + // If the error is StreamingStatus, + var st *StreamingClientStatus + if errors.As(err, &st) { + e = st.TryIntoStreamingError() + if e != nil { + return e + } + } + + // Return a default StreamingError. + return &StreamingError{ + Code: streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN, + Cause: err.Error(), + } +} diff --git a/internal/util/streamingutil/status/streaming_error_test.go b/internal/util/streamingutil/status/streaming_error_test.go new file mode 100644 index 0000000000000..9c7c8dce2fa7f --- /dev/null +++ b/internal/util/streamingutil/status/streaming_error_test.go @@ -0,0 +1,65 @@ +package status + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +func TestStreamingError(t *testing.T) { + streamingErr := NewOnShutdownError("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_ON_SHUTDOWN, cause: test") + assert.False(t, streamingErr.IsWrongStreamingNode()) + pbErr := streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, pbErr.Code) + + streamingErr = NewUnknownError("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_UNKNOWN, cause: test") + assert.False(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_UNKNOWN, pbErr.Code) + + streamingErr = NewInvalidRequestSeq("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_INVALID_REQUEST_SEQ, cause: test") + assert.False(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_INVALID_REQUEST_SEQ, pbErr.Code) + + streamingErr = NewChannelExist("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_CHANNEL_EXIST, cause: test") + assert.False(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_EXIST, pbErr.Code) + + streamingErr = NewChannelNotExist("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_CHANNEL_NOT_EXIST, cause: test") + assert.True(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST, pbErr.Code) + + streamingErr = NewUnmatchedChannelTerm("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_UNMATCHED_CHANNEL_TERM, cause: test") + assert.True(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM, pbErr.Code) + + streamingErr = NewIgnoreOperation("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_IGNORED_OPERATION, cause: test") + assert.False(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_IGNORED_OPERATION, pbErr.Code) + + streamingErr = NewInner("test") + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_INNER, cause: test") + assert.False(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_INNER, pbErr.Code) + + streamingErr = NewOnShutdownError("test, %d", 1) + assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_ON_SHUTDOWN, cause: test, 1") + assert.False(t, streamingErr.IsWrongStreamingNode()) + pbErr = streamingErr.AsPBError() + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, pbErr.Code) +} diff --git a/internal/util/streamingutil/util/id_allocator.go b/internal/util/streamingutil/util/id_allocator.go new file mode 100644 index 0000000000000..2d22bbe9d10ad --- /dev/null +++ b/internal/util/streamingutil/util/id_allocator.go @@ -0,0 +1,17 @@ +package util + +import ( + "go.uber.org/atomic" +) + +func NewIDAllocator() *IDAllocator { + return &IDAllocator{} +} + +type IDAllocator struct { + underlying atomic.Int64 +} + +func (ida *IDAllocator) Allocate() int64 { + return ida.underlying.Inc() +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 68125ac7b1aec..9f862a460f288 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -29,6 +29,7 @@ const ( AbandonLabel = "abandon" SuccessLabel = "success" FailLabel = "fail" + CancelLabel = "cancel" TotalLabel = "total" HybridSearchLabel = "hybrid_search" diff --git a/pkg/metrics/streaming_service_metrics.go b/pkg/metrics/streaming_service_metrics.go new file mode 100644 index 0000000000000..b929a08def863 --- /dev/null +++ b/pkg/metrics/streaming_service_metrics.go @@ -0,0 +1,180 @@ +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +const ( + subsystemStreamingServiceClient = "streaming" + StreamingServiceClientProducerAvailable = "available" + StreamingServiceClientProducerUnAvailable = "unavailable" +) + +var ( + logServiceClientRegisterOnce sync.Once + + // from 64 bytes to 5MB + bytesBuckets = prometheus.ExponentialBucketsRange(64, 5242880, 10) + // from 1ms to 5s + secondsBuckets = prometheus.ExponentialBucketsRange(0.001, 5, 10) + + // Client side metrics + StreamingServiceClientProducerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{ + Name: "producer_total", + Help: "Total of producers", + }, statusLabelName) + + StreamingServiceClientConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{ + Name: "consumer_total", + Help: "Total of consumers", + }, statusLabelName) + + StreamingServiceClientProduceBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{ + Name: "produce_bytes", + Help: "Bytes of produced message", + Buckets: bytesBuckets, + }) + + StreamingServiceClientConsumeBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{ + Name: "consume_bytes", + Help: "Bytes of consumed message", + Buckets: bytesBuckets, + }) + + StreamingServiceClientProduceDurationSeconds = newStreamingServiceClientHistogramVec( + prometheus.HistogramOpts{ + Name: "produce_duration_seconds", + Help: "Duration of client produce", + Buckets: secondsBuckets, + }, + statusLabelName, + ) + + // StreamingCoord metrics + StreamingCoordPChannelTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ + Name: "pchannel_total", + Help: "Total of pchannels", + }) + + // StreamingCoordVChannelTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ + // Name: "vchannel_total", + // Help: "Total of vchannels", + // }) + + StreamingCoordAssignmentListenerTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ + Name: "assignment_listener_total", + Help: "Total of assignment listener", + }) + + StreamingCoordAssignmentInfo = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ + Name: "assignment_info", + Help: "Info of assignment", + }, "global_version", "local_version") + + // StreamingNode metrics + StreamingNodeWALTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{ + Name: "wal_total", + Help: "Total of wal", + }) + + StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{ + Name: "producer_total", + Help: "Total of producers", + }) + + StreamingNodeConsumerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{ + Name: "consumer_total", + Help: "Total of consumers", + }) + + StreamingNodeProduceBytes = newStreamingNodeHistogramVec(prometheus.HistogramOpts{ + Name: "produce_bytes", + Help: "Bytes of produced message", + Buckets: bytesBuckets, + }) + + StreamingNodeConsumeBytes = newStreamingNodeHistogramVec(prometheus.HistogramOpts{ + Name: "consume_bytes", + Help: "Bytes of consumed message", + Buckets: bytesBuckets, + }) + + StreamingNodeProduceDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{ + Name: "produce_duration_seconds", + Help: "Duration of producing message", + Buckets: secondsBuckets, + }, statusLabelName) +) + +func RegisterStreamingServiceClient(registry *prometheus.Registry) { + logServiceClientRegisterOnce.Do(func() { + registry.MustRegister(StreamingServiceClientProducerTotal) + registry.MustRegister(StreamingServiceClientConsumerTotal) + registry.MustRegister(StreamingServiceClientProduceBytes) + registry.MustRegister(StreamingServiceClientConsumeBytes) + registry.MustRegister(StreamingServiceClientProduceDurationSeconds) + }) +} + +// RegisterStreamingCoord registers log service metrics +func RegisterStreamingCoord(registry *prometheus.Registry) { + registry.MustRegister(StreamingCoordPChannelTotal) + registry.MustRegister(StreamingCoordAssignmentListenerTotal) + registry.MustRegister(StreamingCoordAssignmentInfo) +} + +// RegisterStreamingNode registers log service metrics +func RegisterStreamingNode(registry *prometheus.Registry) { + registry.MustRegister(StreamingNodeWALTotal) + registry.MustRegister(StreamingNodeProducerTotal) + registry.MustRegister(StreamingNodeConsumerTotal) + registry.MustRegister(StreamingNodeProduceBytes) + registry.MustRegister(StreamingNodeConsumeBytes) + registry.MustRegister(StreamingNodeProduceDurationSeconds) +} + +func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec { + opts.Namespace = milvusNamespace + opts.Subsystem = typeutil.StreamingCoordRole + labels := mergeLabel(extra...) + return prometheus.NewGaugeVec(opts, labels) +} + +func newStreamingServiceClientGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec { + opts.Namespace = milvusNamespace + opts.Subsystem = subsystemStreamingServiceClient + labels := mergeLabel(extra...) + return prometheus.NewGaugeVec(opts, labels) +} + +func newStreamingServiceClientHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec { + opts.Namespace = milvusNamespace + opts.Subsystem = subsystemStreamingServiceClient + labels := mergeLabel(extra...) + return prometheus.NewHistogramVec(opts, labels) +} + +func newStreamingNodeGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec { + opts.Namespace = milvusNamespace + opts.Subsystem = typeutil.StreamingNodeRole + labels := mergeLabel(extra...) + return prometheus.NewGaugeVec(opts, labels) +} + +func newStreamingNodeHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec { + opts.Namespace = milvusNamespace + opts.Subsystem = typeutil.StreamingNodeRole + labels := mergeLabel(extra...) + return prometheus.NewHistogramVec(opts, labels) +} + +func mergeLabel(extra ...string) []string { + labels := make([]string, 0, 1+len(extra)) + labels = append(labels, nodeIDLabelName) + labels = append(labels, extra...) + return labels +} diff --git a/pkg/util/typeutil/type.go b/pkg/util/typeutil/type.go index 5cfa2459d2042..d570278df5e15 100644 --- a/pkg/util/typeutil/type.go +++ b/pkg/util/typeutil/type.go @@ -48,6 +48,10 @@ const ( IndexNodeRole = "indexnode" // MixtureRole is a constant represents Mixture running modtoe MixtureRole = "mixture" + // StreamingCoord is a constant represent StreamingCoord + StreamingCoordRole = "streamingcoord" + // StreamingNode is a constant represent StreamingNode + StreamingNodeRole = "streamingnode" ) var ( @@ -60,6 +64,7 @@ var ( IndexNodeRole, DataCoordRole, DataNodeRole, + StreamingNodeRole, ) serverTypeList = serverTypeSet.Collect() )