From 9026069b89e0ba5f66340743894a22a26ff94841 Mon Sep 17 00:00:00 2001 From: chyezh Date: Tue, 11 Jun 2024 10:38:01 +0800 Subject: [PATCH] enhance: new messsage interface for log service (#33286) issue: #33285 --------- Signed-off-by: chyezh --- Makefile | 5 +- internal/logservice/.mockery.yaml | 13 + .../mock_message/mock_ImmutableMessage.go | 412 ++++++++++++++++++ .../mock_message/mock_MessageID.go | 245 +++++++++++ .../mock_message/mock_MutableMessage.go | 247 +++++++++++ .../mock_message/mock_RProperties.go | 169 +++++++ internal/proto/log.proto | 18 + .../util/logserviceutil/message/builder.go | 80 ++++ .../util/logserviceutil/message/message.go | 69 +++ .../message/message_builder_test.go | 105 +++++ .../logserviceutil/message/message_handler.go | 34 ++ .../message/message_handler_test.go | 30 ++ .../util/logserviceutil/message/message_id.go | 46 ++ .../logserviceutil/message/message_id_test.go | 44 ++ .../logserviceutil/message/message_impl.go | 105 +++++ .../logserviceutil/message/message_test.go | 27 ++ .../logserviceutil/message/message_type.go | 34 ++ .../util/logserviceutil/message/properties.go | 64 +++ .../util/logserviceutil/message/version.go | 25 ++ scripts/generate_proto.sh | 2 + 20 files changed, 1773 insertions(+), 1 deletion(-) create mode 100644 internal/logservice/.mockery.yaml create mode 100644 internal/mocks/util/logserviceutil/mock_message/mock_ImmutableMessage.go create mode 100644 internal/mocks/util/logserviceutil/mock_message/mock_MessageID.go create mode 100644 internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go create mode 100644 internal/mocks/util/logserviceutil/mock_message/mock_RProperties.go create mode 100644 internal/proto/log.proto create mode 100644 internal/util/logserviceutil/message/builder.go create mode 100644 internal/util/logserviceutil/message/message.go create mode 100644 internal/util/logserviceutil/message/message_builder_test.go create mode 100644 internal/util/logserviceutil/message/message_handler.go create mode 100644 internal/util/logserviceutil/message/message_handler_test.go create mode 100644 internal/util/logserviceutil/message/message_id.go create mode 100644 internal/util/logserviceutil/message/message_id_test.go create mode 100644 internal/util/logserviceutil/message/message_impl.go create mode 100644 internal/util/logserviceutil/message/message_test.go create mode 100644 internal/util/logserviceutil/message/message_type.go create mode 100644 internal/util/logserviceutil/message/properties.go create mode 100644 internal/util/logserviceutil/message/version.go diff --git a/Makefile b/Makefile index 058eafe8c054a..2a3372bad4953 100644 --- a/Makefile +++ b/Makefile @@ -511,7 +511,10 @@ generate-mockery-chunk-manager: getdeps generate-mockery-pkg: $(MAKE) -C pkg generate-mockery -generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg +generate-mockery-log: + $(INSTALL_PATH)/mockery --config $(PWD)/internal/logservice/.mockery.yaml + +generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-log generate-yaml: milvus-tools @echo "Updating milvus config yaml" diff --git a/internal/logservice/.mockery.yaml b/internal/logservice/.mockery.yaml new file mode 100644 index 0000000000000..22c293d1556a8 --- /dev/null +++ b/internal/logservice/.mockery.yaml @@ -0,0 +1,13 @@ +quiet: False +with-expecter: True +filename: "mock_{{.InterfaceName}}.go" +dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/internal\" | dir }}/mock_{{.PackageName}}" +mockname: "Mock{{.InterfaceName}}" +outpkg: "mock_{{.PackageName}}" +packages: + github.com/milvus-io/milvus/internal/util/logserviceutil/message: + interfaces: + MessageID: + ImmutableMessage: + MutableMessage: + RProperties: \ No newline at end of file diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_ImmutableMessage.go b/internal/mocks/util/logserviceutil/mock_message/mock_ImmutableMessage.go new file mode 100644 index 0000000000000..cf42d0037e944 --- /dev/null +++ b/internal/mocks/util/logserviceutil/mock_message/mock_ImmutableMessage.go @@ -0,0 +1,412 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_message + +import ( + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockImmutableMessage is an autogenerated mock type for the ImmutableMessage type +type MockImmutableMessage struct { + mock.Mock +} + +type MockImmutableMessage_Expecter struct { + mock *mock.Mock +} + +func (_m *MockImmutableMessage) EXPECT() *MockImmutableMessage_Expecter { + return &MockImmutableMessage_Expecter{mock: &_m.Mock} +} + +// EstimateSize provides a mock function with given fields: +func (_m *MockImmutableMessage) EstimateSize() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MockImmutableMessage_EstimateSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EstimateSize' +type MockImmutableMessage_EstimateSize_Call struct { + *mock.Call +} + +// EstimateSize is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) EstimateSize() *MockImmutableMessage_EstimateSize_Call { + return &MockImmutableMessage_EstimateSize_Call{Call: _e.mock.On("EstimateSize")} +} + +func (_c *MockImmutableMessage_EstimateSize_Call) Run(run func()) *MockImmutableMessage_EstimateSize_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_EstimateSize_Call) Return(_a0 int) *MockImmutableMessage_EstimateSize_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_EstimateSize_Call) RunAndReturn(run func() int) *MockImmutableMessage_EstimateSize_Call { + _c.Call.Return(run) + return _c +} + +// LastConfirmedMessageID provides a mock function with given fields: +func (_m *MockImmutableMessage) LastConfirmedMessageID() message.MessageID { + ret := _m.Called() + + var r0 message.MessageID + if rf, ok := ret.Get(0).(func() message.MessageID); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + return r0 +} + +// MockImmutableMessage_LastConfirmedMessageID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LastConfirmedMessageID' +type MockImmutableMessage_LastConfirmedMessageID_Call struct { + *mock.Call +} + +// LastConfirmedMessageID is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) LastConfirmedMessageID() *MockImmutableMessage_LastConfirmedMessageID_Call { + return &MockImmutableMessage_LastConfirmedMessageID_Call{Call: _e.mock.On("LastConfirmedMessageID")} +} + +func (_c *MockImmutableMessage_LastConfirmedMessageID_Call) Run(run func()) *MockImmutableMessage_LastConfirmedMessageID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_LastConfirmedMessageID_Call) Return(_a0 message.MessageID) *MockImmutableMessage_LastConfirmedMessageID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_LastConfirmedMessageID_Call) RunAndReturn(run func() message.MessageID) *MockImmutableMessage_LastConfirmedMessageID_Call { + _c.Call.Return(run) + return _c +} + +// MessageID provides a mock function with given fields: +func (_m *MockImmutableMessage) MessageID() message.MessageID { + ret := _m.Called() + + var r0 message.MessageID + if rf, ok := ret.Get(0).(func() message.MessageID); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + return r0 +} + +// MockImmutableMessage_MessageID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageID' +type MockImmutableMessage_MessageID_Call struct { + *mock.Call +} + +// MessageID is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) MessageID() *MockImmutableMessage_MessageID_Call { + return &MockImmutableMessage_MessageID_Call{Call: _e.mock.On("MessageID")} +} + +func (_c *MockImmutableMessage_MessageID_Call) Run(run func()) *MockImmutableMessage_MessageID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_MessageID_Call) Return(_a0 message.MessageID) *MockImmutableMessage_MessageID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_MessageID_Call) RunAndReturn(run func() message.MessageID) *MockImmutableMessage_MessageID_Call { + _c.Call.Return(run) + return _c +} + +// MessageType provides a mock function with given fields: +func (_m *MockImmutableMessage) MessageType() message.MessageType { + ret := _m.Called() + + var r0 message.MessageType + if rf, ok := ret.Get(0).(func() message.MessageType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.MessageType) + } + + return r0 +} + +// MockImmutableMessage_MessageType_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageType' +type MockImmutableMessage_MessageType_Call struct { + *mock.Call +} + +// MessageType is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) MessageType() *MockImmutableMessage_MessageType_Call { + return &MockImmutableMessage_MessageType_Call{Call: _e.mock.On("MessageType")} +} + +func (_c *MockImmutableMessage_MessageType_Call) Run(run func()) *MockImmutableMessage_MessageType_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_MessageType_Call) Return(_a0 message.MessageType) *MockImmutableMessage_MessageType_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_MessageType_Call) RunAndReturn(run func() message.MessageType) *MockImmutableMessage_MessageType_Call { + _c.Call.Return(run) + return _c +} + +// Payload provides a mock function with given fields: +func (_m *MockImmutableMessage) Payload() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// MockImmutableMessage_Payload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Payload' +type MockImmutableMessage_Payload_Call struct { + *mock.Call +} + +// Payload is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) Payload() *MockImmutableMessage_Payload_Call { + return &MockImmutableMessage_Payload_Call{Call: _e.mock.On("Payload")} +} + +func (_c *MockImmutableMessage_Payload_Call) Run(run func()) *MockImmutableMessage_Payload_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_Payload_Call) Return(_a0 []byte) *MockImmutableMessage_Payload_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_Payload_Call) RunAndReturn(run func() []byte) *MockImmutableMessage_Payload_Call { + _c.Call.Return(run) + return _c +} + +// Properties provides a mock function with given fields: +func (_m *MockImmutableMessage) Properties() message.RProperties { + ret := _m.Called() + + var r0 message.RProperties + if rf, ok := ret.Get(0).(func() message.RProperties); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.RProperties) + } + } + + return r0 +} + +// MockImmutableMessage_Properties_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Properties' +type MockImmutableMessage_Properties_Call struct { + *mock.Call +} + +// Properties is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) Properties() *MockImmutableMessage_Properties_Call { + return &MockImmutableMessage_Properties_Call{Call: _e.mock.On("Properties")} +} + +func (_c *MockImmutableMessage_Properties_Call) Run(run func()) *MockImmutableMessage_Properties_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_Properties_Call) Return(_a0 message.RProperties) *MockImmutableMessage_Properties_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_Properties_Call) RunAndReturn(run func() message.RProperties) *MockImmutableMessage_Properties_Call { + _c.Call.Return(run) + return _c +} + +// TimeTick provides a mock function with given fields: +func (_m *MockImmutableMessage) TimeTick() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// MockImmutableMessage_TimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TimeTick' +type MockImmutableMessage_TimeTick_Call struct { + *mock.Call +} + +// TimeTick is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) TimeTick() *MockImmutableMessage_TimeTick_Call { + return &MockImmutableMessage_TimeTick_Call{Call: _e.mock.On("TimeTick")} +} + +func (_c *MockImmutableMessage_TimeTick_Call) Run(run func()) *MockImmutableMessage_TimeTick_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_TimeTick_Call) Return(_a0 uint64) *MockImmutableMessage_TimeTick_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_TimeTick_Call) RunAndReturn(run func() uint64) *MockImmutableMessage_TimeTick_Call { + _c.Call.Return(run) + return _c +} + +// Version provides a mock function with given fields: +func (_m *MockImmutableMessage) Version() message.Version { + ret := _m.Called() + + var r0 message.Version + if rf, ok := ret.Get(0).(func() message.Version); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.Version) + } + + return r0 +} + +// MockImmutableMessage_Version_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Version' +type MockImmutableMessage_Version_Call struct { + *mock.Call +} + +// Version is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) Version() *MockImmutableMessage_Version_Call { + return &MockImmutableMessage_Version_Call{Call: _e.mock.On("Version")} +} + +func (_c *MockImmutableMessage_Version_Call) Run(run func()) *MockImmutableMessage_Version_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_Version_Call) Return(_a0 message.Version) *MockImmutableMessage_Version_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_Version_Call) RunAndReturn(run func() message.Version) *MockImmutableMessage_Version_Call { + _c.Call.Return(run) + return _c +} + +// WALName provides a mock function with given fields: +func (_m *MockImmutableMessage) WALName() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockImmutableMessage_WALName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WALName' +type MockImmutableMessage_WALName_Call struct { + *mock.Call +} + +// WALName is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) WALName() *MockImmutableMessage_WALName_Call { + return &MockImmutableMessage_WALName_Call{Call: _e.mock.On("WALName")} +} + +func (_c *MockImmutableMessage_WALName_Call) Run(run func()) *MockImmutableMessage_WALName_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_WALName_Call) Return(_a0 string) *MockImmutableMessage_WALName_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_WALName_Call) RunAndReturn(run func() string) *MockImmutableMessage_WALName_Call { + _c.Call.Return(run) + return _c +} + +// NewMockImmutableMessage creates a new instance of MockImmutableMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockImmutableMessage(t interface { + mock.TestingT + Cleanup(func()) +}) *MockImmutableMessage { + mock := &MockImmutableMessage{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_MessageID.go b/internal/mocks/util/logserviceutil/mock_message/mock_MessageID.go new file mode 100644 index 0000000000000..d48ae6cf68195 --- /dev/null +++ b/internal/mocks/util/logserviceutil/mock_message/mock_MessageID.go @@ -0,0 +1,245 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_message + +import ( + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockMessageID is an autogenerated mock type for the MessageID type +type MockMessageID struct { + mock.Mock +} + +type MockMessageID_Expecter struct { + mock *mock.Mock +} + +func (_m *MockMessageID) EXPECT() *MockMessageID_Expecter { + return &MockMessageID_Expecter{mock: &_m.Mock} +} + +// EQ provides a mock function with given fields: _a0 +func (_m *MockMessageID) EQ(_a0 message.MessageID) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(message.MessageID) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockMessageID_EQ_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EQ' +type MockMessageID_EQ_Call struct { + *mock.Call +} + +// EQ is a helper method to define mock.On call +// - _a0 message.MessageID +func (_e *MockMessageID_Expecter) EQ(_a0 interface{}) *MockMessageID_EQ_Call { + return &MockMessageID_EQ_Call{Call: _e.mock.On("EQ", _a0)} +} + +func (_c *MockMessageID_EQ_Call) Run(run func(_a0 message.MessageID)) *MockMessageID_EQ_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(message.MessageID)) + }) + return _c +} + +func (_c *MockMessageID_EQ_Call) Return(_a0 bool) *MockMessageID_EQ_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMessageID_EQ_Call) RunAndReturn(run func(message.MessageID) bool) *MockMessageID_EQ_Call { + _c.Call.Return(run) + return _c +} + +// LT provides a mock function with given fields: _a0 +func (_m *MockMessageID) LT(_a0 message.MessageID) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(message.MessageID) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockMessageID_LT_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LT' +type MockMessageID_LT_Call struct { + *mock.Call +} + +// LT is a helper method to define mock.On call +// - _a0 message.MessageID +func (_e *MockMessageID_Expecter) LT(_a0 interface{}) *MockMessageID_LT_Call { + return &MockMessageID_LT_Call{Call: _e.mock.On("LT", _a0)} +} + +func (_c *MockMessageID_LT_Call) Run(run func(_a0 message.MessageID)) *MockMessageID_LT_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(message.MessageID)) + }) + return _c +} + +func (_c *MockMessageID_LT_Call) Return(_a0 bool) *MockMessageID_LT_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMessageID_LT_Call) RunAndReturn(run func(message.MessageID) bool) *MockMessageID_LT_Call { + _c.Call.Return(run) + return _c +} + +// LTE provides a mock function with given fields: _a0 +func (_m *MockMessageID) LTE(_a0 message.MessageID) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(message.MessageID) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockMessageID_LTE_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LTE' +type MockMessageID_LTE_Call struct { + *mock.Call +} + +// LTE is a helper method to define mock.On call +// - _a0 message.MessageID +func (_e *MockMessageID_Expecter) LTE(_a0 interface{}) *MockMessageID_LTE_Call { + return &MockMessageID_LTE_Call{Call: _e.mock.On("LTE", _a0)} +} + +func (_c *MockMessageID_LTE_Call) Run(run func(_a0 message.MessageID)) *MockMessageID_LTE_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(message.MessageID)) + }) + return _c +} + +func (_c *MockMessageID_LTE_Call) Return(_a0 bool) *MockMessageID_LTE_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMessageID_LTE_Call) RunAndReturn(run func(message.MessageID) bool) *MockMessageID_LTE_Call { + _c.Call.Return(run) + return _c +} + +// Marshal provides a mock function with given fields: +func (_m *MockMessageID) Marshal() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// MockMessageID_Marshal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Marshal' +type MockMessageID_Marshal_Call struct { + *mock.Call +} + +// Marshal is a helper method to define mock.On call +func (_e *MockMessageID_Expecter) Marshal() *MockMessageID_Marshal_Call { + return &MockMessageID_Marshal_Call{Call: _e.mock.On("Marshal")} +} + +func (_c *MockMessageID_Marshal_Call) Run(run func()) *MockMessageID_Marshal_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMessageID_Marshal_Call) Return(_a0 []byte) *MockMessageID_Marshal_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMessageID_Marshal_Call) RunAndReturn(run func() []byte) *MockMessageID_Marshal_Call { + _c.Call.Return(run) + return _c +} + +// WALName provides a mock function with given fields: +func (_m *MockMessageID) WALName() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockMessageID_WALName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WALName' +type MockMessageID_WALName_Call struct { + *mock.Call +} + +// WALName is a helper method to define mock.On call +func (_e *MockMessageID_Expecter) WALName() *MockMessageID_WALName_Call { + return &MockMessageID_WALName_Call{Call: _e.mock.On("WALName")} +} + +func (_c *MockMessageID_WALName_Call) Run(run func()) *MockMessageID_WALName_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMessageID_WALName_Call) Return(_a0 string) *MockMessageID_WALName_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMessageID_WALName_Call) RunAndReturn(run func() string) *MockMessageID_WALName_Call { + _c.Call.Return(run) + return _c +} + +// NewMockMessageID creates a new instance of MockMessageID. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockMessageID(t interface { + mock.TestingT + Cleanup(func()) +}) *MockMessageID { + mock := &MockMessageID{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go b/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go new file mode 100644 index 0000000000000..ab62e2a87d3c7 --- /dev/null +++ b/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go @@ -0,0 +1,247 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_message + +import ( + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockMutableMessage is an autogenerated mock type for the MutableMessage type +type MockMutableMessage struct { + mock.Mock +} + +type MockMutableMessage_Expecter struct { + mock *mock.Mock +} + +func (_m *MockMutableMessage) EXPECT() *MockMutableMessage_Expecter { + return &MockMutableMessage_Expecter{mock: &_m.Mock} +} + +// EstimateSize provides a mock function with given fields: +func (_m *MockMutableMessage) EstimateSize() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MockMutableMessage_EstimateSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EstimateSize' +type MockMutableMessage_EstimateSize_Call struct { + *mock.Call +} + +// EstimateSize is a helper method to define mock.On call +func (_e *MockMutableMessage_Expecter) EstimateSize() *MockMutableMessage_EstimateSize_Call { + return &MockMutableMessage_EstimateSize_Call{Call: _e.mock.On("EstimateSize")} +} + +func (_c *MockMutableMessage_EstimateSize_Call) Run(run func()) *MockMutableMessage_EstimateSize_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMutableMessage_EstimateSize_Call) Return(_a0 int) *MockMutableMessage_EstimateSize_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_EstimateSize_Call) RunAndReturn(run func() int) *MockMutableMessage_EstimateSize_Call { + _c.Call.Return(run) + return _c +} + +// MessageType provides a mock function with given fields: +func (_m *MockMutableMessage) MessageType() message.MessageType { + ret := _m.Called() + + var r0 message.MessageType + if rf, ok := ret.Get(0).(func() message.MessageType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.MessageType) + } + + return r0 +} + +// MockMutableMessage_MessageType_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageType' +type MockMutableMessage_MessageType_Call struct { + *mock.Call +} + +// MessageType is a helper method to define mock.On call +func (_e *MockMutableMessage_Expecter) MessageType() *MockMutableMessage_MessageType_Call { + return &MockMutableMessage_MessageType_Call{Call: _e.mock.On("MessageType")} +} + +func (_c *MockMutableMessage_MessageType_Call) Run(run func()) *MockMutableMessage_MessageType_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMutableMessage_MessageType_Call) Return(_a0 message.MessageType) *MockMutableMessage_MessageType_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_MessageType_Call) RunAndReturn(run func() message.MessageType) *MockMutableMessage_MessageType_Call { + _c.Call.Return(run) + return _c +} + +// Payload provides a mock function with given fields: +func (_m *MockMutableMessage) Payload() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// MockMutableMessage_Payload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Payload' +type MockMutableMessage_Payload_Call struct { + *mock.Call +} + +// Payload is a helper method to define mock.On call +func (_e *MockMutableMessage_Expecter) Payload() *MockMutableMessage_Payload_Call { + return &MockMutableMessage_Payload_Call{Call: _e.mock.On("Payload")} +} + +func (_c *MockMutableMessage_Payload_Call) Run(run func()) *MockMutableMessage_Payload_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMutableMessage_Payload_Call) Return(_a0 []byte) *MockMutableMessage_Payload_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_Payload_Call) RunAndReturn(run func() []byte) *MockMutableMessage_Payload_Call { + _c.Call.Return(run) + return _c +} + +// Properties provides a mock function with given fields: +func (_m *MockMutableMessage) Properties() message.Properties { + ret := _m.Called() + + var r0 message.Properties + if rf, ok := ret.Get(0).(func() message.Properties); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.Properties) + } + } + + return r0 +} + +// MockMutableMessage_Properties_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Properties' +type MockMutableMessage_Properties_Call struct { + *mock.Call +} + +// Properties is a helper method to define mock.On call +func (_e *MockMutableMessage_Expecter) Properties() *MockMutableMessage_Properties_Call { + return &MockMutableMessage_Properties_Call{Call: _e.mock.On("Properties")} +} + +func (_c *MockMutableMessage_Properties_Call) Run(run func()) *MockMutableMessage_Properties_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMutableMessage_Properties_Call) Return(_a0 message.Properties) *MockMutableMessage_Properties_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_Properties_Call) RunAndReturn(run func() message.Properties) *MockMutableMessage_Properties_Call { + _c.Call.Return(run) + return _c +} + +// WithTimeTick provides a mock function with given fields: tt +func (_m *MockMutableMessage) WithTimeTick(tt uint64) message.MutableMessage { + ret := _m.Called(tt) + + var r0 message.MutableMessage + if rf, ok := ret.Get(0).(func(uint64) message.MutableMessage); ok { + r0 = rf(tt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MutableMessage) + } + } + + return r0 +} + +// MockMutableMessage_WithTimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WithTimeTick' +type MockMutableMessage_WithTimeTick_Call struct { + *mock.Call +} + +// WithTimeTick is a helper method to define mock.On call +// - tt uint64 +func (_e *MockMutableMessage_Expecter) WithTimeTick(tt interface{}) *MockMutableMessage_WithTimeTick_Call { + return &MockMutableMessage_WithTimeTick_Call{Call: _e.mock.On("WithTimeTick", tt)} +} + +func (_c *MockMutableMessage_WithTimeTick_Call) Run(run func(tt uint64)) *MockMutableMessage_WithTimeTick_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint64)) + }) + return _c +} + +func (_c *MockMutableMessage_WithTimeTick_Call) Return(_a0 message.MutableMessage) *MockMutableMessage_WithTimeTick_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_WithTimeTick_Call) RunAndReturn(run func(uint64) message.MutableMessage) *MockMutableMessage_WithTimeTick_Call { + _c.Call.Return(run) + return _c +} + +// NewMockMutableMessage creates a new instance of MockMutableMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockMutableMessage(t interface { + mock.TestingT + Cleanup(func()) +}) *MockMutableMessage { + mock := &MockMutableMessage{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_RProperties.go b/internal/mocks/util/logserviceutil/mock_message/mock_RProperties.go new file mode 100644 index 0000000000000..5df87240b12fd --- /dev/null +++ b/internal/mocks/util/logserviceutil/mock_message/mock_RProperties.go @@ -0,0 +1,169 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_message + +import mock "github.com/stretchr/testify/mock" + +// MockRProperties is an autogenerated mock type for the RProperties type +type MockRProperties struct { + mock.Mock +} + +type MockRProperties_Expecter struct { + mock *mock.Mock +} + +func (_m *MockRProperties) EXPECT() *MockRProperties_Expecter { + return &MockRProperties_Expecter{mock: &_m.Mock} +} + +// Exist provides a mock function with given fields: key +func (_m *MockRProperties) Exist(key string) bool { + ret := _m.Called(key) + + var r0 bool + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockRProperties_Exist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exist' +type MockRProperties_Exist_Call struct { + *mock.Call +} + +// Exist is a helper method to define mock.On call +// - key string +func (_e *MockRProperties_Expecter) Exist(key interface{}) *MockRProperties_Exist_Call { + return &MockRProperties_Exist_Call{Call: _e.mock.On("Exist", key)} +} + +func (_c *MockRProperties_Exist_Call) Run(run func(key string)) *MockRProperties_Exist_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockRProperties_Exist_Call) Return(_a0 bool) *MockRProperties_Exist_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRProperties_Exist_Call) RunAndReturn(run func(string) bool) *MockRProperties_Exist_Call { + _c.Call.Return(run) + return _c +} + +// Get provides a mock function with given fields: key +func (_m *MockRProperties) Get(key string) (string, bool) { + ret := _m.Called(key) + + var r0 string + var r1 bool + if rf, ok := ret.Get(0).(func(string) (string, bool)); ok { + return rf(key) + } + if rf, ok := ret.Get(0).(func(string) string); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(string) bool); ok { + r1 = rf(key) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// MockRProperties_Get_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Get' +type MockRProperties_Get_Call struct { + *mock.Call +} + +// Get is a helper method to define mock.On call +// - key string +func (_e *MockRProperties_Expecter) Get(key interface{}) *MockRProperties_Get_Call { + return &MockRProperties_Get_Call{Call: _e.mock.On("Get", key)} +} + +func (_c *MockRProperties_Get_Call) Run(run func(key string)) *MockRProperties_Get_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockRProperties_Get_Call) Return(value string, ok bool) *MockRProperties_Get_Call { + _c.Call.Return(value, ok) + return _c +} + +func (_c *MockRProperties_Get_Call) RunAndReturn(run func(string) (string, bool)) *MockRProperties_Get_Call { + _c.Call.Return(run) + return _c +} + +// ToRawMap provides a mock function with given fields: +func (_m *MockRProperties) ToRawMap() map[string]string { + ret := _m.Called() + + var r0 map[string]string + if rf, ok := ret.Get(0).(func() map[string]string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]string) + } + } + + return r0 +} + +// MockRProperties_ToRawMap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ToRawMap' +type MockRProperties_ToRawMap_Call struct { + *mock.Call +} + +// ToRawMap is a helper method to define mock.On call +func (_e *MockRProperties_Expecter) ToRawMap() *MockRProperties_ToRawMap_Call { + return &MockRProperties_ToRawMap_Call{Call: _e.mock.On("ToRawMap")} +} + +func (_c *MockRProperties_ToRawMap_Call) Run(run func()) *MockRProperties_ToRawMap_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRProperties_ToRawMap_Call) Return(_a0 map[string]string) *MockRProperties_ToRawMap_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRProperties_ToRawMap_Call) RunAndReturn(run func() map[string]string) *MockRProperties_ToRawMap_Call { + _c.Call.Return(run) + return _c +} + +// NewMockRProperties creates a new instance of MockRProperties. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockRProperties(t interface { + mock.TestingT + Cleanup(func()) +}) *MockRProperties { + mock := &MockRProperties{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/proto/log.proto b/internal/proto/log.proto new file mode 100644 index 0000000000000..fb9da3a4c7416 --- /dev/null +++ b/internal/proto/log.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package milvus.proto.log; + +option go_package = "github.com/milvus-io/milvus/internal/proto/logpb"; + +import "milvus.proto"; +import "google/protobuf/empty.proto"; + +// +// Common +// + +// Message is the basic unit of communication between publisher and consumer. +message Message { + bytes payload = 1; // message body + map properties = 2; // message properties +} diff --git a/internal/util/logserviceutil/message/builder.go b/internal/util/logserviceutil/message/builder.go new file mode 100644 index 0000000000000..00f064c619f89 --- /dev/null +++ b/internal/util/logserviceutil/message/builder.go @@ -0,0 +1,80 @@ +package message + +// NewBuilder creates a new builder. +func NewBuilder() *Builder { + return &Builder{ + id: nil, + payload: nil, + properties: make(propertiesImpl), + } +} + +// Builder is the builder for message. +type Builder struct { + id MessageID + payload []byte + properties propertiesImpl +} + +// WithMessageID creates a new builder with message id. +func (b *Builder) WithMessageID(id MessageID) *Builder { + b.id = id + return b +} + +// WithMessageType creates a new builder with message type. +func (b *Builder) WithMessageType(t MessageType) *Builder { + b.properties.Set(messageTypeKey, t.marshal()) + return b +} + +// WithProperty creates a new builder with message property. +// A key started with '_' is reserved for log system, should never used at user of client. +func (b *Builder) WithProperty(key string, val string) *Builder { + b.properties.Set(key, val) + return b +} + +// WithProperties creates a new builder with message properties. +// A key started with '_' is reserved for log system, should never used at user of client. +func (b *Builder) WithProperties(kvs map[string]string) *Builder { + for key, val := range kvs { + b.properties.Set(key, val) + } + return b +} + +// WithPayload creates a new builder with message payload. +func (b *Builder) WithPayload(payload []byte) *Builder { + b.payload = payload + return b +} + +// BuildMutable builds a mutable message. +// Panic if set the message id. +func (b *Builder) BuildMutable() MutableMessage { + if b.id != nil { + panic("build a mutable message, message id should be nil") + } + // Set message version. + b.properties.Set(messageVersion, VersionV1.String()) + return &messageImpl{ + payload: b.payload, + properties: b.properties, + } +} + +// BuildImmutable builds a immutable message. +// Panic if not set the message id. +func (b *Builder) BuildImmutable() ImmutableMessage { + if b.id == nil { + panic("build a immutable message, message id should not be nil") + } + return &immutableMessageImpl{ + id: b.id, + messageImpl: messageImpl{ + payload: b.payload, + properties: b.properties, + }, + } +} diff --git a/internal/util/logserviceutil/message/message.go b/internal/util/logserviceutil/message/message.go new file mode 100644 index 0000000000000..776dd81073299 --- /dev/null +++ b/internal/util/logserviceutil/message/message.go @@ -0,0 +1,69 @@ +package message + +var ( + _ BasicMessage = (*messageImpl)(nil) + _ MutableMessage = (*messageImpl)(nil) + _ ImmutableMessage = (*immutableMessageImpl)(nil) +) + +// BasicMessage is the basic interface of message. +type BasicMessage interface { + // MessageType returns the type of message. + MessageType() MessageType + + // Message payload. + Payload() []byte + + // EstimateSize returns the estimated size of message. + EstimateSize() int +} + +// MutableMessage is the mutable message interface. +// Message can be modified before it is persistent by wal. +type MutableMessage interface { + BasicMessage + + // WithLastConfirmed sets the last confirmed message id of current message. + // !!! preserved for log system internal usage, don't call it outside of log system. + WithLastConfirmed(id MessageID) MutableMessage + + // WithTimeTick sets the time tick of current message. + // !!! preserved for log system internal usage, don't call it outside of log system. + WithTimeTick(tt uint64) MutableMessage + + // Properties returns the message properties. + Properties() Properties +} + +// ImmutableMessage is the read-only message interface. +// Once a message is persistent by wal, it will be immutable. +// And the message id will be assigned. +type ImmutableMessage interface { + BasicMessage + + // WALName returns the name of message related wal. + WALName() string + + // TimeTick returns the time tick of current message. + // Available only when the message's version greater than 0. + // Otherwise, it will panic. + TimeTick() uint64 + + // LastConfirmedMessageID returns the last confirmed message id of current message. + // last confirmed message is always a timetick message. + // Read from this message id will guarantee the time tick greater than this message is consumed. + // Available only when the message's version greater than 0. + // Otherwise, it will panic. + LastConfirmedMessageID() MessageID + + // MessageID returns the message id of current message. + MessageID() MessageID + + // Properties returns the message read only properties. + Properties() RProperties + + // Version returns the message format version. + // 0: old version before lognode. + // from 1: new version after lognode. + Version() Version +} diff --git a/internal/util/logserviceutil/message/message_builder_test.go b/internal/util/logserviceutil/message/message_builder_test.go new file mode 100644 index 0000000000000..c937c3d5d26bc --- /dev/null +++ b/internal/util/logserviceutil/message/message_builder_test.go @@ -0,0 +1,105 @@ +package message_test + +import ( + "fmt" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +func TestMessage(t *testing.T) { + b := message.NewBuilder() + mutableMessage := b.WithMessageType(message.MessageTypeTimeTick). + WithPayload([]byte("payload")). + WithProperties(map[string]string{"key": "value"}). + BuildMutable() + + assert.Equal(t, "payload", string(mutableMessage.Payload())) + assert.True(t, mutableMessage.Properties().Exist("key")) + v, ok := mutableMessage.Properties().Get("key") + assert.Equal(t, "value", v) + assert.True(t, ok) + assert.Equal(t, message.MessageTypeTimeTick, mutableMessage.MessageType()) + assert.Equal(t, 21, mutableMessage.EstimateSize()) + mutableMessage.WithTimeTick(123) + v, ok = mutableMessage.Properties().Get("_tt") + assert.True(t, ok) + tt, n := proto.DecodeVarint([]byte(v)) + assert.Equal(t, uint64(123), tt) + assert.Equal(t, len([]byte(v)), n) + + lcMsgID := mock_message.NewMockMessageID(t) + lcMsgID.EXPECT().Marshal().Return([]byte("lcMsgID")) + mutableMessage.WithLastConfirmed(lcMsgID) + v, ok = mutableMessage.Properties().Get("_lc") + assert.True(t, ok) + assert.Equal(t, v, "lcMsgID") + + msgID := mock_message.NewMockMessageID(t) + msgID.EXPECT().EQ(msgID).Return(true) + msgID.EXPECT().WALName().Return("testMsgID") + message.RegisterMessageIDUnmsarshaler("testMsgID", func(data []byte) (message.MessageID, error) { + if string(data) == "lcMsgID" { + return msgID, nil + } + panic(fmt.Sprintf("unexpected data: %s", data)) + }) + + b = message.NewBuilder() + immutableMessage := b.WithMessageID(msgID). + WithPayload([]byte("payload")). + WithProperties(map[string]string{ + "key": "value", + "_t": "1", + "_tt": string(proto.EncodeVarint(456)), + "_v": "1", + "_lc": "lcMsgID", + }). + BuildImmutable() + + assert.True(t, immutableMessage.MessageID().EQ(msgID)) + assert.Equal(t, "payload", string(immutableMessage.Payload())) + assert.True(t, immutableMessage.Properties().Exist("key")) + v, ok = immutableMessage.Properties().Get("key") + assert.Equal(t, "value", v) + assert.True(t, ok) + assert.Equal(t, message.MessageTypeTimeTick, immutableMessage.MessageType()) + assert.Equal(t, 36, immutableMessage.EstimateSize()) + assert.Equal(t, message.Version(1), immutableMessage.Version()) + assert.Equal(t, uint64(456), immutableMessage.TimeTick()) + assert.NotNil(t, immutableMessage.LastConfirmedMessageID()) + + b = message.NewBuilder() + immutableMessage = b.WithMessageID(msgID). + WithPayload([]byte("payload")). + WithProperty("key", "value"). + WithProperty("_t", "1"). + BuildImmutable() + + assert.True(t, immutableMessage.MessageID().EQ(msgID)) + assert.Equal(t, "payload", string(immutableMessage.Payload())) + assert.True(t, immutableMessage.Properties().Exist("key")) + v, ok = immutableMessage.Properties().Get("key") + assert.Equal(t, "value", v) + assert.True(t, ok) + assert.Equal(t, message.MessageTypeTimeTick, immutableMessage.MessageType()) + assert.Equal(t, 18, immutableMessage.EstimateSize()) + assert.Equal(t, message.Version(0), immutableMessage.Version()) + assert.Panics(t, func() { + immutableMessage.TimeTick() + }) + assert.Panics(t, func() { + immutableMessage.LastConfirmedMessageID() + }) + + assert.Panics(t, func() { + message.NewBuilder().WithMessageID(msgID).BuildMutable() + }) + assert.Panics(t, func() { + message.NewBuilder().BuildImmutable() + }) +} diff --git a/internal/util/logserviceutil/message/message_handler.go b/internal/util/logserviceutil/message/message_handler.go new file mode 100644 index 0000000000000..2bb7c92e8d5fe --- /dev/null +++ b/internal/util/logserviceutil/message/message_handler.go @@ -0,0 +1,34 @@ +package message + +// Handler is used to handle message read from log. +type Handler interface { + // Handle is the callback for handling message. + Handle(msg ImmutableMessage) + + // Close is called after all messages are handled or handling is interrupted. + Close() +} + +var _ Handler = ChanMessageHandler(nil) + +// ChanMessageHandler is a handler just forward the message into a channel. +type ChanMessageHandler chan ImmutableMessage + +// Handle is the callback for handling message. +func (cmh ChanMessageHandler) Handle(msg ImmutableMessage) { + cmh <- msg +} + +// Close is called after all messages are handled or handling is interrupted. +func (cmh ChanMessageHandler) Close() { + close(cmh) +} + +// NopCloseHandler is a handler that do nothing when close. +type NopCloseHandler struct { + Handler +} + +// Close is called after all messages are handled or handling is interrupted. +func (nch NopCloseHandler) Close() { +} diff --git a/internal/util/logserviceutil/message/message_handler_test.go b/internal/util/logserviceutil/message/message_handler_test.go new file mode 100644 index 0000000000000..0165823b37714 --- /dev/null +++ b/internal/util/logserviceutil/message/message_handler_test.go @@ -0,0 +1,30 @@ +package message + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMessageHandler(t *testing.T) { + ch := make(chan ImmutableMessage, 100) + h := ChanMessageHandler(ch) + h.Handle(nil) + assert.Nil(t, <-ch) + h.Close() + _, ok := <-ch + assert.False(t, ok) + + ch = make(chan ImmutableMessage, 100) + hNop := NopCloseHandler{ + Handler: ChanMessageHandler(ch), + } + hNop.Handle(nil) + assert.Nil(t, <-ch) + hNop.Close() + select { + case <-ch: + panic("should not be closed") + default: + } +} diff --git a/internal/util/logserviceutil/message/message_id.go b/internal/util/logserviceutil/message/message_id.go new file mode 100644 index 0000000000000..910338ea88cb6 --- /dev/null +++ b/internal/util/logserviceutil/message/message_id.go @@ -0,0 +1,46 @@ +package message + +import ( + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// messageIDUnmarshaler is the map for message id unmarshaler. +var messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler] + +// RegisterMessageIDUnmsarshaler register the message id unmarshaler. +func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler) { + _, loaded := messageIDUnmarshaler.GetOrInsert(name, unmarshaler) + if loaded { + panic("MessageID Unmarshaler already registered: " + name) + } +} + +// MessageIDUnmarshaler is the unmarshaler for message id. +type MessageIDUnmarshaler = func(b []byte) (MessageID, error) + +// UnmsarshalMessageID unmarshal the message id. +func UnmarshalMessageID(name string, b []byte) (MessageID, error) { + unmarshaler, ok := messageIDUnmarshaler.Get(name) + if !ok { + panic("MessageID Unmarshaler not registered: " + name) + } + return unmarshaler(b) +} + +// MessageID is the interface for message id. +type MessageID interface { + // WALName returns the name of message id related wal. + WALName() string + + // LT less than. + LT(MessageID) bool + + // LTE less than or equal to. + LTE(MessageID) bool + + // EQ Equal to. + EQ(MessageID) bool + + // Marshal marshal the message id. + Marshal() []byte +} diff --git a/internal/util/logserviceutil/message/message_id_test.go b/internal/util/logserviceutil/message/message_id_test.go new file mode 100644 index 0000000000000..5aef58bddd8c8 --- /dev/null +++ b/internal/util/logserviceutil/message/message_id_test.go @@ -0,0 +1,44 @@ +package message_test + +import ( + "bytes" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +func TestRegisterMessageIDUnmarshaler(t *testing.T) { + msgID := mock_message.NewMockMessageID(t) + + message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) { + if bytes.Equal(b, []byte("123")) { + return msgID, nil + } + return nil, errors.New("invalid") + }) + + id, err := message.UnmarshalMessageID("test", []byte("123")) + assert.NotNil(t, id) + assert.NoError(t, err) + + id, err = message.UnmarshalMessageID("test", []byte("1234")) + assert.Nil(t, id) + assert.Error(t, err) + + assert.Panics(t, func() { + message.UnmarshalMessageID("test1", []byte("123")) + }) + + assert.Panics(t, func() { + message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) { + if bytes.Equal(b, []byte("123")) { + return msgID, nil + } + return nil, errors.New("invalid") + }) + }) +} diff --git a/internal/util/logserviceutil/message/message_impl.go b/internal/util/logserviceutil/message/message_impl.go new file mode 100644 index 0000000000000..f1f9de5bdaa04 --- /dev/null +++ b/internal/util/logserviceutil/message/message_impl.go @@ -0,0 +1,105 @@ +package message + +import ( + "fmt" + + "github.com/golang/protobuf/proto" +) + +type messageImpl struct { + payload []byte + properties propertiesImpl +} + +// MessageType returns the type of message. +func (m *messageImpl) MessageType() MessageType { + val, ok := m.properties.Get(messageTypeKey) + if !ok { + return MessageTypeUnknown + } + return unmarshalMessageType(val) +} + +// Payload returns payload of current message. +func (m *messageImpl) Payload() []byte { + return m.payload +} + +// Properties returns the message properties. +func (m *messageImpl) Properties() Properties { + return m.properties +} + +// EstimateSize returns the estimated size of current message. +func (m *messageImpl) EstimateSize() int { + // TODO: more accurate size estimation. + return len(m.payload) + m.properties.EstimateSize() +} + +// WithTimeTick sets the time tick of current message. +func (m *messageImpl) WithTimeTick(tt uint64) MutableMessage { + t := proto.EncodeVarint(tt) + m.properties.Set(messageTimeTick, string(t)) + return m +} + +// WithLastConfirmed sets the last confirmed message id of current message. +func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage { + m.properties.Set(messageLastConfirmed, string(id.Marshal())) + return m +} + +type immutableMessageImpl struct { + messageImpl + id MessageID +} + +// WALName returns the name of message related wal. +func (m *immutableMessageImpl) WALName() string { + return m.id.WALName() +} + +// TimeTick returns the time tick of current message. +func (m *immutableMessageImpl) TimeTick() uint64 { + value, ok := m.properties.Get(messageTimeTick) + if !ok { + panic(fmt.Sprintf("there's a bug in the message codes, timetick lost in properties of message, id: %+v", m.id)) + } + v := []byte(value) + tt, n := proto.DecodeVarint(v) + if n != len(v) { + panic(fmt.Sprintf("there's a bug in the message codes, dirty timetick in properties of message, id: %+v", m.id)) + } + return tt +} + +func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID { + value, ok := m.properties.Get(messageLastConfirmed) + if !ok { + panic(fmt.Sprintf("there's a bug in the message codes, last confirmed message lost in properties of message, id: %+v", m.id)) + } + id, err := UnmarshalMessageID(m.id.WALName(), []byte(value)) + if err != nil { + panic(fmt.Sprintf("there's a bug in the message codes, dirty last confirmed message in properties of message, id: %+v", m.id)) + } + return id +} + +// MessageID returns the message id. +func (m *immutableMessageImpl) MessageID() MessageID { + return m.id +} + +// Properties returns the message read only properties. +func (m *immutableMessageImpl) Properties() RProperties { + return m.properties +} + +// Version returns the message format version. +func (m *immutableMessageImpl) Version() Version { + value, ok := m.properties.Get(messageVersion) + if !ok { + return VersionOld + } + return newMessageVersionFromString(value) +} diff --git a/internal/util/logserviceutil/message/message_test.go b/internal/util/logserviceutil/message/message_test.go new file mode 100644 index 0000000000000..f35094e08fcac --- /dev/null +++ b/internal/util/logserviceutil/message/message_test.go @@ -0,0 +1,27 @@ +package message + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMessageType(t *testing.T) { + s := MessageTypeUnknown.marshal() + assert.Equal(t, "0", s) + typ := unmarshalMessageType("0") + assert.Equal(t, MessageTypeUnknown, typ) + + typ = unmarshalMessageType("882s9") + assert.Equal(t, MessageTypeUnknown, typ) +} + +func TestVersion(t *testing.T) { + v := newMessageVersionFromString("") + assert.Equal(t, VersionOld, v) + assert.Panics(t, func() { + newMessageVersionFromString("s1") + }) + v = newMessageVersionFromString("1") + assert.Equal(t, VersionV1, v) +} diff --git a/internal/util/logserviceutil/message/message_type.go b/internal/util/logserviceutil/message/message_type.go new file mode 100644 index 0000000000000..9353a2cf11c81 --- /dev/null +++ b/internal/util/logserviceutil/message/message_type.go @@ -0,0 +1,34 @@ +package message + +import "strconv" + +type MessageType int32 + +const ( + MessageTypeUnknown MessageType = 0 + MessageTypeTimeTick MessageType = 1 +) + +var messageTypeName = map[MessageType]string{ + MessageTypeUnknown: "MESSAGE_TYPE_UNKNOWN", + MessageTypeTimeTick: "MESSAGE_TYPE_TIME_TICK", +} + +// String implements fmt.Stringer interface. +func (t MessageType) String() string { + return messageTypeName[t] +} + +// marshal marshal MessageType to string. +func (t MessageType) marshal() string { + return strconv.FormatInt(int64(t), 10) +} + +// unmarshalMessageType unmarshal MessageType from string. +func unmarshalMessageType(s string) MessageType { + i, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return MessageTypeUnknown + } + return MessageType(i) +} diff --git a/internal/util/logserviceutil/message/properties.go b/internal/util/logserviceutil/message/properties.go new file mode 100644 index 0000000000000..aa007cd57ab21 --- /dev/null +++ b/internal/util/logserviceutil/message/properties.go @@ -0,0 +1,64 @@ +package message + +const ( + // preserved properties + messageVersion = "_v" // message version for compatibility. + messageTypeKey = "_t" // message type key. + messageTimeTick = "_tt" // message time tick. + messageLastConfirmed = "_lc" // message last confirmed message id. +) + +var ( + _ RProperties = propertiesImpl{} + _ Properties = propertiesImpl{} +) + +// RProperties is the read-only properties for message. +type RProperties interface { + // Get find a value by key. + Get(key string) (value string, ok bool) + + // Exist check if a key exists. + Exist(key string) bool + + // ToRawMap returns the raw map of properties. + ToRawMap() map[string]string +} + +// Properties is the write and readable properties for message. +type Properties interface { + RProperties + + // Set a key-value pair in Properties. + Set(key, value string) +} + +// propertiesImpl is the implementation of Properties. +type propertiesImpl map[string]string + +func (prop propertiesImpl) Get(key string) (value string, ok bool) { + value, ok = prop[key] + return +} + +func (prop propertiesImpl) Exist(key string) bool { + _, ok := prop[key] + return ok +} + +func (prop propertiesImpl) Set(key, value string) { + prop[key] = value +} + +func (prop propertiesImpl) ToRawMap() map[string]string { + return map[string]string(prop) +} + +// EstimateSize returns the estimated size of properties. +func (prop propertiesImpl) EstimateSize() int { + size := 0 + for k, v := range prop { + size += len(k) + len(v) + } + return size +} diff --git a/internal/util/logserviceutil/message/version.go b/internal/util/logserviceutil/message/version.go new file mode 100644 index 0000000000000..1e99e51f33de9 --- /dev/null +++ b/internal/util/logserviceutil/message/version.go @@ -0,0 +1,25 @@ +package message + +import "strconv" + +var ( + VersionOld Version = 0 // old version before lognode. + VersionV1 Version = 1 +) + +type Version int // message version for compatibility. + +func newMessageVersionFromString(s string) Version { + if s == "" { + return VersionOld + } + v, err := strconv.ParseInt(s, 10, 64) + if err != nil { + panic("unexpected message version") + } + return Version(v) +} + +func (v Version) String() string { + return strconv.FormatInt(int64(v), 10) +} diff --git a/scripts/generate_proto.sh b/scripts/generate_proto.sh index 5b92bef12e5c5..03c4e9f687b02 100755 --- a/scripts/generate_proto.sh +++ b/scripts/generate_proto.sh @@ -57,6 +57,7 @@ mkdir -p indexpb mkdir -p datapb mkdir -p querypb mkdir -p planpb +mkdir -p logpb mkdir -p $ROOT_DIR/cmd/tools/migration/legacy/legacypb @@ -74,6 +75,7 @@ ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./querypb query_coord. ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./planpb plan.proto|| { echo 'generate plan.proto failed'; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./segcorepb segcore.proto|| { echo 'generate segcore.proto failed'; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./clusteringpb clustering.proto|| { echo 'generate clustering.proto failed'; exit 1; } +${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./logpb log.proto|| { echo 'generate logpb.proto failed'; exit 1; } ${protoc_opt} --proto_path=$ROOT_DIR/cmd/tools/migration/legacy/ \ --go_out=plugins=grpc,paths=source_relative:../../cmd/tools/migration/legacy/legacypb legacy.proto || { echo 'generate legacy.proto failed'; exit 1; }