diff --git a/internal/.mockery.yaml b/internal/.mockery.yaml index 8dba9081c2a14..e9c2cdca12ad7 100644 --- a/internal/.mockery.yaml +++ b/internal/.mockery.yaml @@ -40,6 +40,9 @@ packages: github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector: interfaces: SealOperator: + github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector: + interfaces: + TimeTickSyncOperator: google.golang.org/grpc: interfaces: ClientStream: @@ -67,4 +70,3 @@ packages: google.golang.org/grpc/balancer: interfaces: SubConn: - diff --git a/internal/distributed/streaming/append.go b/internal/distributed/streaming/append.go index 41ddd9f5ea4ac..530b9448d4305 100644 --- a/internal/distributed/streaming/append.go +++ b/internal/distributed/streaming/append.go @@ -6,6 +6,7 @@ import ( "github.com/milvus-io/milvus/internal/distributed/streaming/internal/producer" "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/funcutil" ) @@ -18,8 +19,8 @@ func newAppendResponseN(n int) AppendResponses { // AppendResponse is the response of one append operation. type AppendResponse struct { - MessageID message.MessageID - Error error + AppendResult *types.AppendResult + Error error } // AppendResponses is the response of append operation. @@ -27,8 +28,8 @@ type AppendResponses struct { Responses []AppendResponse } -// IsAnyError returns the first error in the responses. -func (a AppendResponses) IsAnyError() error { +// UnwrapFirstError returns the first error in the responses. +func (a AppendResponses) UnwrapFirstError() error { for _, r := range a.Responses { if r.Error != nil { return r.Error @@ -37,6 +38,17 @@ func (a AppendResponses) IsAnyError() error { return nil } +// MaxTimeTick returns the max time tick in the responses. +func (a AppendResponses) MaxTimeTick() uint64 { + maxTimeTick := uint64(0) + for _, r := range a.Responses { + if r.AppendResult.TimeTick > maxTimeTick { + maxTimeTick = r.AppendResult.TimeTick + } + } + return maxTimeTick +} + // fillAllError fills all the responses with the same error. func (a *AppendResponses) fillAllError(err error) { for i := range a.Responses { @@ -122,10 +134,10 @@ func (w *walAccesserImpl) appendToPChannel(ctx context.Context, pchannel string, // TODO: only the partition-key with high partition will generate many message in one time on the same pchannel, // we should optimize the message-format, make it into one; but not the goroutine count. if len(msgs) == 1 { - msgID, err := p.Produce(ctx, msgs[0]) + produceResult, err := p.Produce(ctx, msgs[0]) resp.fillResponseAtIdx(AppendResponse{ - MessageID: msgID, - Error: err, + AppendResult: produceResult, + Error: err, }, 0) return resp } @@ -144,8 +156,8 @@ func (w *walAccesserImpl) appendToPChannel(ctx context.Context, pchannel string, mu.Lock() resp.fillResponseAtIdx(AppendResponse{ - MessageID: msgID, - Error: err, + AppendResult: msgID, + Error: err, }, i) mu.Unlock() return struct{}{}, nil diff --git a/internal/distributed/streaming/internal/producer/producer.go b/internal/distributed/streaming/internal/producer/producer.go index a0a7ab1d76a71..d53452ff42ade 100644 --- a/internal/distributed/streaming/internal/producer/producer.go +++ b/internal/distributed/streaming/internal/producer/producer.go @@ -76,7 +76,7 @@ type ResumableProducer struct { } // Produce produce a new message to log service. -func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (msgID message.MessageID, err error) { +func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (*producer.ProduceResult, error) { if p.lifetime.Add(lifetime.IsWorking) != nil { return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer") } @@ -89,9 +89,9 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess return nil, err } - msgID, err := producerHandler.Produce(ctx, msg) + produceResult, err := producerHandler.Produce(ctx, msg) if err == nil { - return msgID, nil + return produceResult, nil } // It's ok to stop retry if the error is canceled or deadline exceed. if status.IsCanceled(err) { diff --git a/internal/distributed/streaming/internal/producer/producer_test.go b/internal/distributed/streaming/internal/producer/producer_test.go index 768ff5086c74f..f85134a7fd42f 100644 --- a/internal/distributed/streaming/internal/producer/producer_test.go +++ b/internal/distributed/streaming/internal/producer/producer_test.go @@ -19,7 +19,10 @@ import ( func TestResumableProducer(t *testing.T) { p := mock_producer.NewMockProducer(t) msgID := mock_message.NewMockMessageID(t) - p.EXPECT().Produce(mock.Anything, mock.Anything).Return(msgID, nil) + p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&producer.ProduceResult{ + MessageID: msgID, + TimeTick: 100, + }, nil) p.EXPECT().Close().Return() ch := make(chan struct{}) p.EXPECT().Available().Return(ch) @@ -44,11 +47,14 @@ func TestResumableProducer(t *testing.T) { } else if i == 2 { p := mock_producer.NewMockProducer(t) msgID := mock_message.NewMockMessageID(t) - p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { + p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*producer.ProduceResult, error) { if ctx.Err() != nil { return nil, ctx.Err() } - return msgID, nil + return &producer.ProduceResult{ + MessageID: msgID, + TimeTick: 100, + }, nil }) p.EXPECT().Close().Return() p.EXPECT().Available().Return(ch2) diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index 879475f4e3e17..e08ee103a1176 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -3,8 +3,7 @@ package streaming import ( "context" - clientv3 "go.etcd.io/etcd/client/v3" - + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/options" ) @@ -17,7 +16,8 @@ func SetWAL(w WALAccesser) { // Init initializes the wal accesser with the given etcd client. // should be called before any other operations. -func Init(c *clientv3.Client) { +func Init() { + c, _ := kvfactory.GetEtcdAndPath() singleton = newWALAccesser(c) } diff --git a/internal/distributed/streaming/streaming_test.go b/internal/distributed/streaming/streaming_test.go index 9039aedc5e407..945d465dc6ada 100644 --- a/internal/distributed/streaming/streaming_test.go +++ b/internal/distributed/streaming/streaming_test.go @@ -8,7 +8,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/distributed/streaming" - kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/options" _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar" @@ -19,8 +18,7 @@ const vChannel = "by-dev-rootcoord-dml_4" func TestMain(m *testing.M) { paramtable.Init() - etcd, _ := kvfactory.GetEtcdAndPath() - streaming.Init(etcd) + streaming.Init() defer streaming.Release() m.Run() } diff --git a/internal/mocks/streamingnode/client/handler/mock_producer/mock_Producer.go b/internal/mocks/streamingnode/client/handler/mock_producer/mock_Producer.go index 61dfda479cfdb..7f639bf1ea38a 100644 --- a/internal/mocks/streamingnode/client/handler/mock_producer/mock_Producer.go +++ b/internal/mocks/streamingnode/client/handler/mock_producer/mock_Producer.go @@ -182,19 +182,19 @@ func (_c *MockProducer_IsAvailable_Call) RunAndReturn(run func() bool) *MockProd } // Produce provides a mock function with given fields: ctx, msg -func (_m *MockProducer) Produce(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { +func (_m *MockProducer) Produce(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) { ret := _m.Called(ctx, msg) - var r0 message.MessageID + var r0 *types.AppendResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (*types.AppendResult, error)); ok { return rf(ctx, msg) } - if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok { + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) *types.AppendResult); ok { r0 = rf(ctx, msg) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(message.MessageID) + r0 = ret.Get(0).(*types.AppendResult) } } @@ -226,12 +226,12 @@ func (_c *MockProducer_Produce_Call) Run(run func(ctx context.Context, msg messa return _c } -func (_c *MockProducer_Produce_Call) Return(_a0 message.MessageID, _a1 error) *MockProducer_Produce_Call { +func (_c *MockProducer_Produce_Call) Return(_a0 *types.AppendResult, _a1 error) *MockProducer_Produce_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockProducer_Produce_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockProducer_Produce_Call { +func (_c *MockProducer_Produce_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (*types.AppendResult, error)) *MockProducer_Produce_Call { _c.Call.Return(run) return _c } diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go index 043730cfc2a9d..0a650a3d9c186 100644 --- a/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go +++ b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go @@ -27,19 +27,19 @@ func (_m *MockWAL) EXPECT() *MockWAL_Expecter { } // Append provides a mock function with given fields: ctx, msg -func (_m *MockWAL) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { +func (_m *MockWAL) Append(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) { ret := _m.Called(ctx, msg) - var r0 message.MessageID + var r0 *types.AppendResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (*types.AppendResult, error)); ok { return rf(ctx, msg) } - if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok { + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) *types.AppendResult); ok { r0 = rf(ctx, msg) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(message.MessageID) + r0 = ret.Get(0).(*types.AppendResult) } } @@ -71,18 +71,18 @@ func (_c *MockWAL_Append_Call) Run(run func(ctx context.Context, msg message.Mut return _c } -func (_c *MockWAL_Append_Call) Return(_a0 message.MessageID, _a1 error) *MockWAL_Append_Call { +func (_c *MockWAL_Append_Call) Return(_a0 *types.AppendResult, _a1 error) *MockWAL_Append_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockWAL_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockWAL_Append_Call { +func (_c *MockWAL_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (*types.AppendResult, error)) *MockWAL_Append_Call { _c.Call.Return(run) return _c } // AppendAsync provides a mock function with given fields: ctx, msg, cb -func (_m *MockWAL) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) { +func (_m *MockWAL) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*types.AppendResult, error)) { _m.Called(ctx, msg, cb) } @@ -94,14 +94,14 @@ type MockWAL_AppendAsync_Call struct { // AppendAsync is a helper method to define mock.On call // - ctx context.Context // - msg message.MutableMessage -// - cb func(message.MessageID , error) +// - cb func(*types.AppendResult , error) func (_e *MockWAL_Expecter) AppendAsync(ctx interface{}, msg interface{}, cb interface{}) *MockWAL_AppendAsync_Call { return &MockWAL_AppendAsync_Call{Call: _e.mock.On("AppendAsync", ctx, msg, cb)} } -func (_c *MockWAL_AppendAsync_Call) Run(run func(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error))) *MockWAL_AppendAsync_Call { +func (_c *MockWAL_AppendAsync_Call) Run(run func(ctx context.Context, msg message.MutableMessage, cb func(*types.AppendResult, error))) *MockWAL_AppendAsync_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(message.MessageID, error))) + run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(*types.AppendResult, error))) }) return _c } @@ -111,7 +111,7 @@ func (_c *MockWAL_AppendAsync_Call) Return() *MockWAL_AppendAsync_Call { return _c } -func (_c *MockWAL_AppendAsync_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(message.MessageID, error))) *MockWAL_AppendAsync_Call { +func (_c *MockWAL_AppendAsync_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(*types.AppendResult, error))) *MockWAL_AppendAsync_Call { _c.Call.Return(run) return _c } diff --git a/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go b/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go new file mode 100644 index 0000000000000..22ed2dde74d19 --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go @@ -0,0 +1,156 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_inspector + +import ( + context "context" + + inspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" + mock "github.com/stretchr/testify/mock" + + types "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +// MockTimeTickSyncOperator is an autogenerated mock type for the TimeTickSyncOperator type +type MockTimeTickSyncOperator struct { + mock.Mock +} + +type MockTimeTickSyncOperator_Expecter struct { + mock *mock.Mock +} + +func (_m *MockTimeTickSyncOperator) EXPECT() *MockTimeTickSyncOperator_Expecter { + return &MockTimeTickSyncOperator_Expecter{mock: &_m.Mock} +} + +// Channel provides a mock function with given fields: +func (_m *MockTimeTickSyncOperator) Channel() types.PChannelInfo { + ret := _m.Called() + + var r0 types.PChannelInfo + if rf, ok := ret.Get(0).(func() types.PChannelInfo); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(types.PChannelInfo) + } + + return r0 +} + +// MockTimeTickSyncOperator_Channel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Channel' +type MockTimeTickSyncOperator_Channel_Call struct { + *mock.Call +} + +// Channel is a helper method to define mock.On call +func (_e *MockTimeTickSyncOperator_Expecter) Channel() *MockTimeTickSyncOperator_Channel_Call { + return &MockTimeTickSyncOperator_Channel_Call{Call: _e.mock.On("Channel")} +} + +func (_c *MockTimeTickSyncOperator_Channel_Call) Run(run func()) *MockTimeTickSyncOperator_Channel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTimeTickSyncOperator_Channel_Call) Return(_a0 types.PChannelInfo) *MockTimeTickSyncOperator_Channel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTimeTickSyncOperator_Channel_Call) RunAndReturn(run func() types.PChannelInfo) *MockTimeTickSyncOperator_Channel_Call { + _c.Call.Return(run) + return _c +} + +// Sync provides a mock function with given fields: ctx +func (_m *MockTimeTickSyncOperator) Sync(ctx context.Context) { + _m.Called(ctx) +} + +// MockTimeTickSyncOperator_Sync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Sync' +type MockTimeTickSyncOperator_Sync_Call struct { + *mock.Call +} + +// Sync is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockTimeTickSyncOperator_Expecter) Sync(ctx interface{}) *MockTimeTickSyncOperator_Sync_Call { + return &MockTimeTickSyncOperator_Sync_Call{Call: _e.mock.On("Sync", ctx)} +} + +func (_c *MockTimeTickSyncOperator_Sync_Call) Run(run func(ctx context.Context)) *MockTimeTickSyncOperator_Sync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockTimeTickSyncOperator_Sync_Call) Return() *MockTimeTickSyncOperator_Sync_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTimeTickSyncOperator_Sync_Call) RunAndReturn(run func(context.Context)) *MockTimeTickSyncOperator_Sync_Call { + _c.Call.Return(run) + return _c +} + +// TimeTickNotifier provides a mock function with given fields: +func (_m *MockTimeTickSyncOperator) TimeTickNotifier() *inspector.TimeTickNotifier { + ret := _m.Called() + + var r0 *inspector.TimeTickNotifier + if rf, ok := ret.Get(0).(func() *inspector.TimeTickNotifier); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*inspector.TimeTickNotifier) + } + } + + return r0 +} + +// MockTimeTickSyncOperator_TimeTickNotifier_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TimeTickNotifier' +type MockTimeTickSyncOperator_TimeTickNotifier_Call struct { + *mock.Call +} + +// TimeTickNotifier is a helper method to define mock.On call +func (_e *MockTimeTickSyncOperator_Expecter) TimeTickNotifier() *MockTimeTickSyncOperator_TimeTickNotifier_Call { + return &MockTimeTickSyncOperator_TimeTickNotifier_Call{Call: _e.mock.On("TimeTickNotifier")} +} + +func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) Run(run func()) *MockTimeTickSyncOperator_TimeTickNotifier_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) Return(_a0 *inspector.TimeTickNotifier) *MockTimeTickSyncOperator_TimeTickNotifier_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) RunAndReturn(run func() *inspector.TimeTickNotifier) *MockTimeTickSyncOperator_TimeTickNotifier_Call { + _c.Call.Return(run) + return _c +} + +// NewMockTimeTickSyncOperator creates a new instance of MockTimeTickSyncOperator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockTimeTickSyncOperator(t interface { + mock.TestingT + Cleanup(func()) +}) *MockTimeTickSyncOperator { + mock := &MockTimeTickSyncOperator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_interceptors/mock_InterceptorBuilder.go b/internal/mocks/streamingnode/server/wal/mock_interceptors/mock_InterceptorBuilder.go index 556ba6d9f38b3..95bdc5c1be24a 100644 --- a/internal/mocks/streamingnode/server/wal/mock_interceptors/mock_InterceptorBuilder.go +++ b/internal/mocks/streamingnode/server/wal/mock_interceptors/mock_InterceptorBuilder.go @@ -21,15 +21,15 @@ func (_m *MockInterceptorBuilder) EXPECT() *MockInterceptorBuilder_Expecter { } // Build provides a mock function with given fields: param -func (_m *MockInterceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.BasicInterceptor { +func (_m *MockInterceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.Interceptor { ret := _m.Called(param) - var r0 interceptors.BasicInterceptor - if rf, ok := ret.Get(0).(func(interceptors.InterceptorBuildParam) interceptors.BasicInterceptor); ok { + var r0 interceptors.Interceptor + if rf, ok := ret.Get(0).(func(interceptors.InterceptorBuildParam) interceptors.Interceptor); ok { r0 = rf(param) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(interceptors.BasicInterceptor) + r0 = ret.Get(0).(interceptors.Interceptor) } } @@ -54,12 +54,12 @@ func (_c *MockInterceptorBuilder_Build_Call) Run(run func(param interceptors.Int return _c } -func (_c *MockInterceptorBuilder_Build_Call) Return(_a0 interceptors.BasicInterceptor) *MockInterceptorBuilder_Build_Call { +func (_c *MockInterceptorBuilder_Build_Call) Return(_a0 interceptors.Interceptor) *MockInterceptorBuilder_Build_Call { _c.Call.Return(_a0) return _c } -func (_c *MockInterceptorBuilder_Build_Call) RunAndReturn(run func(interceptors.InterceptorBuildParam) interceptors.BasicInterceptor) *MockInterceptorBuilder_Build_Call { +func (_c *MockInterceptorBuilder_Build_Call) RunAndReturn(run func(interceptors.InterceptorBuildParam) interceptors.Interceptor) *MockInterceptorBuilder_Build_Call { _c.Call.Return(run) return _c } diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index 2721fef77220f..e637d4b29a4b2 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -259,7 +259,7 @@ func (c *bgGarbageCollector) notifyPartitionGcByStreamingService(ctx context.Con msgs = append(msgs, msg) } resp := streaming.WAL().Append(ctx, msgs...) - if err := resp.IsAnyError(); err != nil { + if err := resp.UnwrapFirstError(); err != nil { return 0, err } // TODO: sheep, return resp.MaxTimeTick(), nil diff --git a/internal/streamingnode/client/handler/producer/producer.go b/internal/streamingnode/client/handler/producer/producer.go index b611931c86ac0..8190f4110c3c6 100644 --- a/internal/streamingnode/client/handler/producer/producer.go +++ b/internal/streamingnode/client/handler/producer/producer.go @@ -9,6 +9,8 @@ import ( var _ Producer = (*producerImpl)(nil) +type ProduceResult = types.AppendResult + // Producer is the interface that wraps the basic produce method on grpc stream. // Producer is work on a single stream on grpc, // so Producer cannot recover from failure because of the stream is broken. @@ -17,7 +19,7 @@ type Producer interface { Assignment() types.PChannelInfoAssigned // Produce sends the produce message to server. - Produce(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + Produce(ctx context.Context, msg message.MutableMessage) (*ProduceResult, error) // Check if a producer is available. IsAvailable() bool diff --git a/internal/streamingnode/client/handler/producer/producer_impl.go b/internal/streamingnode/client/handler/producer/producer_impl.go index 45bfdfcf729f6..9e690721b94ed 100644 --- a/internal/streamingnode/client/handler/producer/producer_impl.go +++ b/internal/streamingnode/client/handler/producer/producer_impl.go @@ -115,8 +115,8 @@ type produceRequest struct { } type produceResponse struct { - id message.MessageID - err error + result *ProduceResult + err error } // Assignment returns the assignment of the producer. @@ -125,7 +125,7 @@ func (p *producerImpl) Assignment() types.PChannelInfoAssigned { } // Produce sends the produce message to server. -func (p *producerImpl) Produce(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { +func (p *producerImpl) Produce(ctx context.Context, msg message.MutableMessage) (*ProduceResult, error) { if p.lifetime.Add(lifetime.IsWorking) != nil { return nil, status.NewOnShutdownError("producer client is shutting down") } @@ -154,7 +154,7 @@ func (p *producerImpl) Produce(ctx context.Context, msg message.MutableMessage) case <-ctx.Done(): return nil, ctx.Err() case resp := <-respCh: - return resp.id, resp.err + return resp.result, resp.err } } @@ -294,7 +294,10 @@ func (p *producerImpl) recvLoop() (err error) { return err } result = produceResponse{ - id: msgID, + result: &ProduceResult{ + MessageID: msgID, + TimeTick: produceResp.Result.GetTimetick(), + }, } case *streamingpb.ProduceMessageResponse_Error: result = produceResponse{ diff --git a/internal/streamingnode/server/resource/resource.go b/internal/streamingnode/server/resource/resource.go index 5cff55f2170b7..f53a9a65cfb20 100644 --- a/internal/streamingnode/server/resource/resource.go +++ b/internal/streamingnode/server/resource/resource.go @@ -11,8 +11,9 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/streamingnode/server/flusher" "github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector" + sinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats" + tinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" "github.com/milvus-io/milvus/internal/types" ) @@ -87,7 +88,8 @@ func Init(opts ...optResourceInit) { r.timestampAllocator = idalloc.NewTSOAllocator(r.rootCoordClient) r.idAllocator = idalloc.NewIDAllocator(r.rootCoordClient) r.segmentAssignStatsManager = stats.NewStatsManager() - r.segmentSealedInspector = inspector.NewSealedInspector(r.segmentAssignStatsManager.SealNotifier()) + r.segmentSealedInspector = sinspector.NewSealedInspector(r.segmentAssignStatsManager.SealNotifier()) + r.timeTickInspector = tinspector.NewTimeTickSyncInspector() assertNotNil(r.TSOAllocator()) assertNotNil(r.RootCoordClient()) @@ -95,6 +97,7 @@ func Init(opts ...optResourceInit) { assertNotNil(r.StreamingNodeCatalog()) assertNotNil(r.SegmentAssignStatsManager()) assertNotNil(r.SegmentSealedInspector()) + assertNotNil(r.TimeTickInspector()) } // Resource access the underlying singleton of resources. @@ -117,7 +120,8 @@ type resourceImpl struct { dataCoordClient types.DataCoordClient streamingNodeCatalog metastore.StreamingNodeCataLog segmentAssignStatsManager *stats.StatsManager - segmentSealedInspector inspector.SealOperationInspector + segmentSealedInspector sinspector.SealOperationInspector + timeTickInspector tinspector.TimeTickSyncInspector } // Flusher returns the flusher. @@ -176,10 +180,14 @@ func (r *resourceImpl) SegmentAssignStatsManager() *stats.StatsManager { } // SegmentSealedInspector returns the segment sealed inspector. -func (r *resourceImpl) SegmentSealedInspector() inspector.SealOperationInspector { +func (r *resourceImpl) SegmentSealedInspector() sinspector.SealOperationInspector { return r.segmentSealedInspector } +func (r *resourceImpl) TimeTickInspector() tinspector.TimeTickSyncInspector { + return r.timeTickInspector +} + // assertNotNil panics if the resource is nil. func assertNotNil(v interface{}) { iv := reflect.ValueOf(v) diff --git a/internal/streamingnode/server/resource/resource_test.go b/internal/streamingnode/server/resource/resource_test.go index 27dde00197657..ad114ebc827b3 100644 --- a/internal/streamingnode/server/resource/resource_test.go +++ b/internal/streamingnode/server/resource/resource_test.go @@ -8,9 +8,12 @@ import ( "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/mocks/mock_metastore" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestInit(t *testing.T) { + paramtable.Init() + assert.Panics(t, func() { Init() }) diff --git a/internal/streamingnode/server/resource/test_utility.go b/internal/streamingnode/server/resource/test_utility.go index 547db9595e3e9..da3b220404e10 100644 --- a/internal/streamingnode/server/resource/test_utility.go +++ b/internal/streamingnode/server/resource/test_utility.go @@ -7,8 +7,9 @@ import ( "testing" "github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector" + sinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats" + tinspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" ) // InitForTest initializes the singleton of resources for test. @@ -26,5 +27,6 @@ func InitForTest(t *testing.T, opts ...optResourceInit) { r.idAllocator = idalloc.NewIDAllocator(r.rootCoordClient) } r.segmentAssignStatsManager = stats.NewStatsManager() - r.segmentSealedInspector = inspector.NewSealedInspector(r.segmentAssignStatsManager.SealNotifier()) + r.segmentSealedInspector = sinspector.NewSealedInspector(r.segmentAssignStatsManager.SealNotifier()) + r.timeTickInspector = tinspector.NewTimeTickSyncInspector() } diff --git a/internal/streamingnode/server/service/handler/producer/produce_server.go b/internal/streamingnode/server/service/handler/producer/produce_server.go index 4e87a0a8f3e65..5bbf53c39de82 100644 --- a/internal/streamingnode/server/service/handler/producer/produce_server.go +++ b/internal/streamingnode/server/service/handler/producer/produce_server.go @@ -187,12 +187,12 @@ func (p *ProduceServer) handleProduce(req *streamingpb.ProduceMessageRequest) { // Concurrent append request can be executed concurrently. messageSize := msg.EstimateSize() now := time.Now() - p.wal.AppendAsync(p.produceServer.Context(), msg, func(id message.MessageID, err error) { + p.wal.AppendAsync(p.produceServer.Context(), msg, func(appendResult *wal.AppendResult, err error) { defer func() { p.appendWG.Done() p.updateMetrics(messageSize, time.Since(now).Seconds(), err) }() - p.sendProduceResult(req.RequestId, id, err) + p.sendProduceResult(req.RequestId, appendResult, err) }) } @@ -212,7 +212,7 @@ func (p *ProduceServer) validateMessage(msg message.MutableMessage) error { } // sendProduceResult sends the produce result to client. -func (p *ProduceServer) sendProduceResult(reqID int64, id message.MessageID, err error) { +func (p *ProduceServer) sendProduceResult(reqID int64, appendResult *wal.AppendResult, err error) { resp := &streamingpb.ProduceMessageResponse{ RequestId: reqID, } @@ -225,8 +225,9 @@ func (p *ProduceServer) sendProduceResult(reqID int64, id message.MessageID, err resp.Response = &streamingpb.ProduceMessageResponse_Result{ Result: &streamingpb.ProduceMessageResponseResult{ Id: &messagespb.MessageID{ - Id: id.Marshal(), + Id: appendResult.MessageID.Marshal(), }, + Timetick: appendResult.TimeTick, }, } } @@ -235,9 +236,9 @@ func (p *ProduceServer) sendProduceResult(reqID int64, id message.MessageID, err // all pending response message should be dropped, client side will handle it. select { case p.produceMessageCh <- resp: - p.logger.Debug("send produce message response to client", zap.Int64("requestID", reqID), zap.Any("messageID", id), zap.Error(err)) + p.logger.Debug("send produce message response to client", zap.Int64("requestID", reqID), zap.Any("appendResult", appendResult), zap.Error(err)) case <-p.produceServer.Context().Done(): - p.logger.Warn("stream closed before produce message response sent", zap.Int64("requestID", reqID), zap.Any("messageID", id)) + p.logger.Warn("stream closed before produce message response sent", zap.Int64("requestID", reqID), zap.Any("appendResult", appendResult), zap.Error(err)) return } } diff --git a/internal/streamingnode/server/service/handler/producer/produce_server_test.go b/internal/streamingnode/server/service/handler/producer/produce_server_test.go index 0b20dbc07bcb6..2737b40e1216b 100644 --- a/internal/streamingnode/server/service/handler/producer/produce_server_test.go +++ b/internal/streamingnode/server/service/handler/producer/produce_server_test.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_walmanager" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/walmanager" "github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil" "github.com/milvus-io/milvus/pkg/log" @@ -194,9 +195,12 @@ func TestProduceServerRecvArm(t *testing.T) { Name: "test", Term: 1, }) - l.EXPECT().AppendAsync(mock.Anything, mock.Anything, mock.Anything).Run(func(ctx context.Context, mm message.MutableMessage, f func(message.MessageID, error)) { + l.EXPECT().AppendAsync(mock.Anything, mock.Anything, mock.Anything).Run(func(ctx context.Context, mm message.MutableMessage, f func(*wal.AppendResult, error)) { msgID := walimplstest.NewTestMessageID(1) - f(msgID, nil) + f(&wal.AppendResult{ + MessageID: msgID, + TimeTick: 100, + }, nil) }) l.EXPECT().IsAvailable().Return(true) @@ -238,7 +242,7 @@ func TestProduceServerRecvArm(t *testing.T) { // Test send error. l.EXPECT().AppendAsync(mock.Anything, mock.Anything, mock.Anything).Unset() - l.EXPECT().AppendAsync(mock.Anything, mock.Anything, mock.Anything).Run(func(ctx context.Context, mm message.MutableMessage, f func(message.MessageID, error)) { + l.EXPECT().AppendAsync(mock.Anything, mock.Anything, mock.Anything).Run(func(ctx context.Context, mm message.MutableMessage, f func(*wal.AppendResult, error)) { f(nil, errors.New("append error")) }) diff --git a/internal/streamingnode/server/wal/adaptor/message_handler.go b/internal/streamingnode/server/wal/adaptor/message_handler.go index 5be230c4f01d7..8ec28014a623b 100644 --- a/internal/streamingnode/server/wal/adaptor/message_handler.go +++ b/internal/streamingnode/server/wal/adaptor/message_handler.go @@ -1,35 +1,36 @@ package adaptor import ( - "context" - - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor" - "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var ( + _ wal.MessageHandler = defaultMessageHandler(nil) + _ wal.MessageHandler = (*MsgPackAdaptorHandler)(nil) ) type defaultMessageHandler chan message.ImmutableMessage -func (d defaultMessageHandler) Handle(ctx context.Context, upstream <-chan message.ImmutableMessage, msg message.ImmutableMessage) (incoming message.ImmutableMessage, ok bool, err error) { +func (h defaultMessageHandler) Handle(param wal.HandleParam) wal.HandleResult { var sendingCh chan message.ImmutableMessage - if msg != nil { - sendingCh = d + if param.Message != nil { + sendingCh = h } select { - case <-ctx.Done(): - return nil, false, ctx.Err() - case msg, ok := <-upstream: + case <-param.Ctx.Done(): + return wal.HandleResult{Error: param.Ctx.Err()} + case msg, ok := <-param.Upstream: if !ok { - return nil, false, wal.ErrUpstreamClosed + return wal.HandleResult{Error: wal.ErrUpstreamClosed} } - return msg, false, nil - case sendingCh <- msg: - return nil, true, nil + return wal.HandleResult{Incoming: msg} + case sendingCh <- param.Message: + return wal.HandleResult{MessageHandled: true} + case <-param.TimeTickChan: + return wal.HandleResult{TimeTickUpdated: true} } } @@ -40,92 +41,67 @@ func (d defaultMessageHandler) Close() { // NewMsgPackAdaptorHandler create a new message pack adaptor handler. func NewMsgPackAdaptorHandler() *MsgPackAdaptorHandler { return &MsgPackAdaptorHandler{ - logger: log.With(), - channel: make(chan *msgstream.MsgPack), - pendings: make([]message.ImmutableMessage, 0), - pendingMsgPack: typeutil.NewMultipartQueue[*msgstream.MsgPack](), + base: adaptor.NewBaseMsgPackAdaptorHandler(), } } -// MsgPackAdaptorHandler is the handler for message pack. type MsgPackAdaptorHandler struct { - logger *log.MLogger - channel chan *msgstream.MsgPack - pendings []message.ImmutableMessage // pendings hold the vOld message which has same time tick. - pendingMsgPack *typeutil.MultipartQueue[*msgstream.MsgPack] // pendingMsgPack hold unsent msgPack. + base *adaptor.BaseMsgPackAdaptorHandler } // Chan is the channel for message. func (m *MsgPackAdaptorHandler) Chan() <-chan *msgstream.MsgPack { - return m.channel + return m.base.Channel } // Handle is the callback for handling message. -func (m *MsgPackAdaptorHandler) Handle(ctx context.Context, upstream <-chan message.ImmutableMessage, msg message.ImmutableMessage) (incoming message.ImmutableMessage, ok bool, err error) { +func (m *MsgPackAdaptorHandler) Handle(param wal.HandleParam) wal.HandleResult { + messageHandled := false // not handle new message if there are pending msgPack. - if msg != nil && m.pendingMsgPack.Len() == 0 { - m.generateMsgPack(msg) - ok = true + if param.Message != nil && m.base.PendingMsgPack.Len() == 0 { + m.base.GenerateMsgPack(param.Message) + messageHandled = true } for { var sendCh chan<- *msgstream.MsgPack - if m.pendingMsgPack.Len() != 0 { - sendCh = m.channel + if m.base.PendingMsgPack.Len() != 0 { + sendCh = m.base.Channel } select { - case <-ctx.Done(): - return nil, ok, ctx.Err() - case msg, notClose := <-upstream: + case <-param.Ctx.Done(): + return wal.HandleResult{ + MessageHandled: messageHandled, + Error: param.Ctx.Err(), + } + case msg, notClose := <-param.Upstream: if !notClose { - return nil, ok, wal.ErrUpstreamClosed + return wal.HandleResult{ + MessageHandled: messageHandled, + Error: wal.ErrUpstreamClosed, + } + } + return wal.HandleResult{ + Incoming: msg, + MessageHandled: messageHandled, } - return msg, ok, nil - case sendCh <- m.pendingMsgPack.Next(): - m.pendingMsgPack.UnsafeAdvance() - if m.pendingMsgPack.Len() > 0 { + case sendCh <- m.base.PendingMsgPack.Next(): + m.base.PendingMsgPack.UnsafeAdvance() + if m.base.PendingMsgPack.Len() > 0 { continue } - return nil, ok, nil - } - } -} - -// generateMsgPack generate msgPack from message. -func (m *MsgPackAdaptorHandler) generateMsgPack(msg message.ImmutableMessage) { - switch msg.Version() { - case message.VersionOld: - if len(m.pendings) != 0 { - if msg.TimeTick() > m.pendings[0].TimeTick() { - m.addMsgPackIntoPending(m.pendings...) - m.pendings = nil + return wal.HandleResult{MessageHandled: messageHandled} + case <-param.TimeTickChan: + return wal.HandleResult{ + MessageHandled: messageHandled, + TimeTickUpdated: true, } } - m.pendings = append(m.pendings, msg) - case message.VersionV1: - if len(m.pendings) != 0 { // all previous message should be vOld. - m.addMsgPackIntoPending(m.pendings...) - m.pendings = nil - } - m.addMsgPackIntoPending(msg) - default: - panic("unsupported message version") - } -} - -// addMsgPackIntoPending add message into pending msgPack. -func (m *MsgPackAdaptorHandler) addMsgPackIntoPending(msgs ...message.ImmutableMessage) { - newPack, err := adaptor.NewMsgPackFromMessage(msgs...) - if err != nil { - m.logger.Warn("failed to convert message to msgpack", zap.Error(err)) - } - if newPack != nil { - m.pendingMsgPack.AddOne(newPack) } } // Close closes the handler. func (m *MsgPackAdaptorHandler) Close() { - close(m.channel) + close(m.base.Channel) } diff --git a/internal/streamingnode/server/wal/adaptor/message_handler_test.go b/internal/streamingnode/server/wal/adaptor/message_handler_test.go index c84fb225b8421..b3c7dedafddda 100644 --- a/internal/streamingnode/server/wal/adaptor/message_handler_test.go +++ b/internal/streamingnode/server/wal/adaptor/message_handler_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" @@ -34,15 +35,23 @@ func TestMsgPackAdaptorHandler(t *testing.T) { close(done) }() upstream <- immutableMsg - newMsg, ok, err := h.Handle(ctx, upstream, nil) - assert.Equal(t, newMsg, immutableMsg) - assert.False(t, ok) - assert.NoError(t, err) + resp := h.Handle(wal.HandleParam{ + Ctx: ctx, + Upstream: upstream, + Message: nil, + }) + assert.Equal(t, resp.Incoming, immutableMsg) + assert.False(t, resp.MessageHandled) + assert.NoError(t, resp.Error) - newMsg, ok, err = h.Handle(ctx, upstream, newMsg) - assert.NoError(t, err) - assert.Nil(t, newMsg) - assert.True(t, ok) + resp = h.Handle(wal.HandleParam{ + Ctx: ctx, + Upstream: upstream, + Message: resp.Incoming, + }) + assert.NoError(t, resp.Error) + assert.Nil(t, resp.Incoming) + assert.True(t, resp.MessageHandled) h.Close() <-done @@ -60,16 +69,24 @@ func TestDefaultHandler(t *testing.T) { upstream := make(chan message.ImmutableMessage, 1) msg := mock_message.NewMockImmutableMessage(t) upstream <- msg - newMsg, ok, err := h.Handle(context.Background(), upstream, nil) - assert.NotNil(t, newMsg) - assert.NoError(t, err) - assert.False(t, ok) - assert.Equal(t, newMsg, msg) + resp := h.Handle(wal.HandleParam{ + Ctx: context.Background(), + Upstream: upstream, + Message: nil, + }) + assert.NotNil(t, resp.Incoming) + assert.NoError(t, resp.Error) + assert.False(t, resp.MessageHandled) + assert.Equal(t, resp.Incoming, msg) - newMsg, ok, err = h.Handle(context.Background(), upstream, newMsg) - assert.NoError(t, err) - assert.Nil(t, newMsg) - assert.True(t, ok) + resp = h.Handle(wal.HandleParam{ + Ctx: context.Background(), + Upstream: upstream, + Message: resp.Incoming, + }) + assert.NoError(t, resp.Error) + assert.Nil(t, resp.Incoming) + assert.True(t, resp.MessageHandled) h.Close() <-done diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index f48b6f6b7be72..1276b0fd5c1e8 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -3,13 +3,17 @@ package adaptor import ( "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -26,13 +30,14 @@ func newScannerAdaptor( readOption.MesasgeHandler = defaultMessageHandler(make(chan message.ImmutableMessage)) } s := &scannerAdaptorImpl{ - logger: log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)), - innerWAL: l, - readOption: readOption, - reorderBuffer: utility.NewReOrderBuffer(), - pendingQueue: typeutil.NewMultipartQueue[message.ImmutableMessage](), - cleanup: cleanup, - ScannerHelper: helper.NewScannerHelper(name), + logger: log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)), + innerWAL: l, + readOption: readOption, + reorderBuffer: utility.NewReOrderBuffer(), + pendingQueue: typeutil.NewMultipartQueue[message.ImmutableMessage](), + cleanup: cleanup, + ScannerHelper: helper.NewScannerHelper(name), + lastTimeTickInfo: inspector.TimeTickInfo{}, } go s.executeConsume() return s @@ -41,12 +46,13 @@ func newScannerAdaptor( // scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface. type scannerAdaptorImpl struct { *helper.ScannerHelper - logger *log.MLogger - innerWAL walimpls.WALImpls - readOption wal.ReadOption - reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now. - pendingQueue *typeutil.MultipartQueue[message.ImmutableMessage] // - cleanup func() + logger *log.MLogger + innerWAL walimpls.WALImpls + readOption wal.ReadOption + reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now. + pendingQueue *typeutil.MultipartQueue[message.ImmutableMessage] // + cleanup func() + lastTimeTickInfo inspector.TimeTickInfo } // Channel returns the channel assignment info of the wal. @@ -82,25 +88,42 @@ func (s *scannerAdaptorImpl) executeConsume() { } defer innerScanner.Close() + timeTickNotifier := resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).TimeTickNotifier() + for { // generate the event channel and do the event loop. // TODO: Consume from local cache. - upstream := s.getUpstream(innerScanner) - - msg, ok, err := s.readOption.MesasgeHandler.Handle(s.Context(), upstream, s.pendingQueue.Next()) - if err != nil { + handleResult := s.readOption.MesasgeHandler.Handle(wal.HandleParam{ + Ctx: s.Context(), + Upstream: s.getUpstream(innerScanner), + TimeTickChan: s.getTimeTickUpdateChan(timeTickNotifier), + Message: s.pendingQueue.Next(), + }) + if handleResult.Error != nil { s.Finish(err) return } - if ok { + if handleResult.MessageHandled { s.pendingQueue.UnsafeAdvance() } - if msg != nil { - s.handleUpstream(msg) + if handleResult.Incoming != nil { + s.handleUpstream(handleResult.Incoming) + } + // If the timetick just updated with a non persist operation, + // we just make a fake message to keep timetick sync if there are no more pending message. + if handleResult.TimeTickUpdated { + s.handleTimeTickUpdated(timeTickNotifier) } } } +func (s *scannerAdaptorImpl) getTimeTickUpdateChan(timeTickNotifier *inspector.TimeTickNotifier) <-chan struct{} { + if s.pendingQueue.Len() == 0 && s.reorderBuffer.Len() == 0 && !s.lastTimeTickInfo.IsZero() { + return timeTickNotifier.WatchAtMessageID(s.lastTimeTickInfo.MessageID, s.lastTimeTickInfo.TimeTick) + } + return nil +} + func (s *scannerAdaptorImpl) getUpstream(scanner walimpls.ScannerImpls) <-chan message.ImmutableMessage { // TODO: configurable pending buffer count. // If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading. @@ -115,6 +138,11 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { // If the time tick message incoming, // the reorder buffer can be consumed into a pending queue with latest timetick. s.pendingQueue.Add(s.reorderBuffer.PopUtilTimeTick(msg.TimeTick())) + s.lastTimeTickInfo = inspector.TimeTickInfo{ + MessageID: msg.MessageID(), + TimeTick: msg.TimeTick(), + LastConfirmedMessageID: msg.LastConfirmedMessageID(), + } return } // Filtering the message if needed. @@ -130,3 +158,20 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { zap.Error(err)) } } + +func (s *scannerAdaptorImpl) handleTimeTickUpdated(timeTickNotifier *inspector.TimeTickNotifier) { + timeTickInfo := timeTickNotifier.Get() + if timeTickInfo.MessageID.EQ(s.lastTimeTickInfo.MessageID) && timeTickInfo.TimeTick > s.lastTimeTickInfo.TimeTick { + s.lastTimeTickInfo.TimeTick = timeTickInfo.TimeTick + msg, err := timetick.NewTimeTickMsg( + s.lastTimeTickInfo.TimeTick, + s.lastTimeTickInfo.LastConfirmedMessageID, + paramtable.GetNodeID(), + ) + if err != nil { + s.logger.Warn("unreachable: a marshal timetick operation must be success") + return + } + s.pendingQueue.AddOne(msg.IntoImmutableMessage(s.lastTimeTickInfo.MessageID)) + } +} diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go index 319f8a2345d88..cf337e560800c 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go @@ -19,10 +19,13 @@ func TestScannerAdaptorReadError(t *testing.T) { l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err) l.EXPECT().Channel().Return(types.PChannelInfo{}) - s := newScannerAdaptor("scanner", l, wal.ReadOption{ - DeliverPolicy: options.DeliverPolicyAll(), - MessageFilter: nil, - }, func() {}) + s := newScannerAdaptor("scanner", + l, + wal.ReadOption{ + DeliverPolicy: options.DeliverPolicyAll(), + MessageFilter: nil, + }, + func() {}) defer s.Close() <-s.Chan() diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index 9da8d2e96b2dd..76892aaab8813 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -20,6 +20,8 @@ import ( var _ wal.WAL = (*walAdaptorImpl)(nil) +type unwrapMessageIDFunc func(*wal.AppendResult) + // adaptImplsToWAL creates a new wal from wal impls. func adaptImplsToWAL( basicWAL walimpls.WALImpls, @@ -30,15 +32,13 @@ func adaptImplsToWAL( WALImpls: basicWAL, WAL: syncutil.NewFuture[wal.WAL](), } - interceptor := buildInterceptor(builders, param) - wal := &walAdaptorImpl{ lifetime: lifetime.NewLifetime(lifetime.Working), idAllocator: typeutil.NewIDAllocator(), inner: basicWAL, // TODO: make the pool size configurable. - appendExecutionPool: conc.NewPool[struct{}](10), - interceptor: interceptor, + appendExecutionPool: conc.NewPool[struct{}](10), + interceptorBuildResult: buildInterceptor(builders, param), scannerRegistry: scannerRegistry{ channel: basicWAL.Channel(), idAllocator: typeutil.NewIDAllocator(), @@ -52,14 +52,14 @@ func adaptImplsToWAL( // walAdaptorImpl is a wrapper of WALImpls to extend it into a WAL interface. type walAdaptorImpl struct { - lifetime lifetime.Lifetime[lifetime.State] - idAllocator *typeutil.IDAllocator - inner walimpls.WALImpls - appendExecutionPool *conc.Pool[struct{}] - interceptor interceptors.InterceptorWithReady - scannerRegistry scannerRegistry - scanners *typeutil.ConcurrentMap[int64, wal.Scanner] - cleanup func() + lifetime lifetime.Lifetime[lifetime.State] + idAllocator *typeutil.IDAllocator + inner walimpls.WALImpls + appendExecutionPool *conc.Pool[struct{}] + interceptorBuildResult interceptorBuildResult + scannerRegistry scannerRegistry + scanners *typeutil.ConcurrentMap[int64, wal.Scanner] + cleanup func() } func (w *walAdaptorImpl) WALName() string { @@ -72,7 +72,7 @@ func (w *walAdaptorImpl) Channel() types.PChannelInfo { } // Append writes a record to the log. -func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { +func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (*wal.AppendResult, error) { if w.lifetime.Add(lifetime.IsWorking) != nil { return nil, status.NewOnShutdownError("wal is on shutdown") } @@ -82,15 +82,23 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) select { case <-ctx.Done(): return nil, ctx.Err() - case <-w.interceptor.Ready(): + case <-w.interceptorBuildResult.Interceptor.Ready(): } // Execute the interceptor and wal append. - return w.interceptor.DoAppend(ctx, msg, w.inner.Append) + messageID, err := w.interceptorBuildResult.Interceptor.DoAppend(ctx, msg, w.inner.Append) + if err != nil { + return nil, err + } + + // unwrap the messageID if needed. + r := &wal.AppendResult{MessageID: messageID} + w.interceptorBuildResult.UnwrapMessageIDFunc(r) + return r, nil } // AppendAsync writes a record to the log asynchronously. -func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) { +func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*wal.AppendResult, error)) { if w.lifetime.Add(lifetime.IsWorking) != nil { cb(nil, status.NewOnShutdownError("wal is on shutdown")) return @@ -119,9 +127,13 @@ func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Sca } // wrap the scanner with cleanup function. id := w.idAllocator.Allocate() - s := newScannerAdaptor(name, w.inner, opts, func() { - w.scanners.Remove(id) - }) + s := newScannerAdaptor( + name, + w.inner, + opts, + func() { + w.scanners.Remove(id) + }) w.scanners.Insert(id, s) return s, nil } @@ -149,17 +161,41 @@ func (w *walAdaptorImpl) Close() { return true }) w.inner.Close() - w.interceptor.Close() + w.interceptorBuildResult.Close() w.appendExecutionPool.Free() w.cleanup() } +type interceptorBuildResult struct { + Interceptor interceptors.InterceptorWithReady + UnwrapMessageIDFunc unwrapMessageIDFunc +} + +func (r interceptorBuildResult) Close() { + r.Interceptor.Close() +} + // newWALWithInterceptors creates a new wal with interceptors. -func buildInterceptor(builders []interceptors.InterceptorBuilder, param interceptors.InterceptorBuildParam) interceptors.InterceptorWithReady { +func buildInterceptor(builders []interceptors.InterceptorBuilder, param interceptors.InterceptorBuildParam) interceptorBuildResult { // Build all interceptors. - builtIterceptors := make([]interceptors.BasicInterceptor, 0, len(builders)) + builtIterceptors := make([]interceptors.Interceptor, 0, len(builders)) for _, b := range builders { builtIterceptors = append(builtIterceptors, b.Build(param)) } - return interceptors.NewChainedInterceptor(builtIterceptors...) + + unwrapMessageIDFuncs := make([]func(*wal.AppendResult), 0) + for _, i := range builtIterceptors { + if r, ok := i.(interceptors.InterceptorWithUnwrapMessageID); ok { + unwrapMessageIDFuncs = append(unwrapMessageIDFuncs, r.UnwrapMessageID) + } + } + + return interceptorBuildResult{ + Interceptor: interceptors.NewChainedInterceptor(builtIterceptors...), + UnwrapMessageIDFunc: func(result *wal.AppendResult) { + for _, f := range unwrapMessageIDFuncs { + f(result) + } + }, + } } diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go index 7863fb23df52f..aa327fb263c42 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go @@ -10,9 +10,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_interceptors" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" @@ -38,6 +41,14 @@ func TestWalAdaptorReadFail(t *testing.T) { } func TestWALAdaptor(t *testing.T) { + resource.InitForTest(t) + + operator := mock_inspector.NewMockTimeTickSyncOperator(t) + operator.EXPECT().TimeTickNotifier().Return(inspector.NewTimeTickNotifier()) + operator.EXPECT().Channel().Return(types.PChannelInfo{}) + operator.EXPECT().Sync(mock.Anything).Return() + resource.Resource().TimeTickInspector().RegisterSyncOperator(operator) + // Create a mock WAL implementation l := mock_walimpls.NewMockWALImpls(t) l.EXPECT().Channel().Return(types.PChannelInfo{}) @@ -61,7 +72,7 @@ func TestWALAdaptor(t *testing.T) { assert.NotNil(t, lAdapted.Channel()) _, err := lAdapted.Append(context.Background(), nil) assert.NoError(t, err) - lAdapted.AppendAsync(context.Background(), nil, func(mi message.MessageID, err error) { + lAdapted.AppendAsync(context.Background(), nil, func(mi *wal.AppendResult, err error) { assert.Nil(t, err) }) @@ -99,7 +110,7 @@ func TestWALAdaptor(t *testing.T) { _, err = lAdapted.Append(context.Background(), nil) assertShutdownError(t, err) - lAdapted.AppendAsync(context.Background(), nil, func(mi message.MessageID, err error) { + lAdapted.AppendAsync(context.Background(), nil, func(mi *wal.AppendResult, err error) { assertShutdownError(t, err) }) _, err = lAdapted.Read(context.Background(), wal.ReadOption{}) @@ -136,7 +147,7 @@ func TestWALWithInterceptor(t *testing.T) { b := mock_interceptors.NewMockInterceptorBuilder(t) readyCh := make(chan struct{}) - b.EXPECT().Build(mock.Anything).RunAndReturn(func(ibp interceptors.InterceptorBuildParam) interceptors.BasicInterceptor { + b.EXPECT().Build(mock.Anything).RunAndReturn(func(ibp interceptors.InterceptorBuildParam) interceptors.Interceptor { interceptor := mock_interceptors.NewMockInterceptorWithReady(t) interceptor.EXPECT().Ready().Return(readyCh) interceptor.EXPECT().DoAppend(mock.Anything, mock.Anything, mock.Anything).RunAndReturn( diff --git a/internal/streamingnode/server/wal/adaptor/wal_test.go b/internal/streamingnode/server/wal/adaptor/wal_test.go index bde613fb0773b..d030bab00d24d 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_test.go +++ b/internal/streamingnode/server/wal/adaptor/wal_test.go @@ -230,10 +230,10 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess "id": fmt.Sprintf("%d", i), "const": "t", }) - id, err := w.Append(ctx, msg) + appendResult, err := w.Append(ctx, msg) assert.NoError(f.t, err) - assert.NotNil(f.t, id) - messages[i] = msg.IntoImmutableMessage(id) + assert.NotNil(f.t, appendResult) + messages[i] = msg.IntoImmutableMessage(appendResult.MessageID) }(i) } swg.Wait() @@ -243,9 +243,9 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess "const": "t", "term": strconv.FormatInt(int64(f.term), 10), }) - id, err := w.Append(ctx, msg) + appendResult, err := w.Append(ctx, msg) assert.NoError(f.t, err) - messages[f.messageCount-1] = msg.IntoImmutableMessage(id) + messages[f.messageCount-1] = msg.IntoImmutableMessage(appendResult.MessageID) return messages, nil } @@ -263,6 +263,9 @@ func (f *testOneWALFramework) testRead(ctx context.Context, w wal.WAL) ([]messag msgs := make([]message.ImmutableMessage, 0, expectedCnt) for { msg, ok := <-s.Chan() + if msg.MessageType() != message.MessageTypeInsert { + continue + } assert.NotNil(f.t, msg) assert.True(f.t, ok) msgs = append(msgs, msg) @@ -304,6 +307,9 @@ func (f *testOneWALFramework) testReadWithOption(ctx context.Context, w wal.WAL) lastTimeTick := readFromMsg.TimeTick() - 1 for { msg, ok := <-s.Chan() + if msg.MessageType() != message.MessageTypeInsert { + continue + } msgCount++ assert.NotNil(f.t, msg) assert.True(f.t, ok) diff --git a/internal/streamingnode/server/wal/interceptors/chain_interceptor.go b/internal/streamingnode/server/wal/interceptors/chain_interceptor.go index b8b066d5378c1..96216aca89419 100644 --- a/internal/streamingnode/server/wal/interceptors/chain_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/chain_interceptor.go @@ -14,12 +14,10 @@ type ( ) // NewChainedInterceptor creates a new chained interceptor. -func NewChainedInterceptor(interceptors ...BasicInterceptor) InterceptorWithReady { +func NewChainedInterceptor(interceptors ...Interceptor) InterceptorWithReady { appendCalls := make([]appendInterceptorCall, 0, len(interceptors)) for _, i := range interceptors { - if r, ok := i.(AppendInterceptor); ok { - appendCalls = append(appendCalls, r.DoAppend) - } + appendCalls = append(appendCalls, i.DoAppend) } return &chainedInterceptor{ closed: make(chan struct{}), @@ -31,7 +29,7 @@ func NewChainedInterceptor(interceptors ...BasicInterceptor) InterceptorWithRead // chainedInterceptor chains all interceptors into one. type chainedInterceptor struct { closed chan struct{} - interceptors []BasicInterceptor + interceptors []Interceptor appendCall appendInterceptorCall } @@ -41,7 +39,7 @@ func (c *chainedInterceptor) Ready() <-chan struct{} { go func() { for _, i := range c.interceptors { // check if ready is implemented - if r, ok := i.(InterceptorReady); ok { + if r, ok := i.(InterceptorWithReady); ok { select { case <-r.Ready(): case <-c.closed: diff --git a/internal/streamingnode/server/wal/interceptors/chain_interceptor_test.go b/internal/streamingnode/server/wal/interceptors/chain_interceptor_test.go index fc27c268ccc43..8f756281e1045 100644 --- a/internal/streamingnode/server/wal/interceptors/chain_interceptor_test.go +++ b/internal/streamingnode/server/wal/interceptors/chain_interceptor_test.go @@ -22,7 +22,7 @@ func TestChainInterceptor(t *testing.T) { func TestChainReady(t *testing.T) { count := 5 channels := make([]chan struct{}, 0, count) - ips := make([]interceptors.BasicInterceptor, 0, count) + ips := make([]interceptors.Interceptor, 0, count) for i := 0; i < count; i++ { ch := make(chan struct{}) channels = append(channels, ch) @@ -79,7 +79,7 @@ func testChainInterceptor(t *testing.T, count int) { } appendInterceptorRecords := make([]record, 0, count) - ips := make([]interceptors.BasicInterceptor, 0, count) + ips := make([]interceptors.Interceptor, 0, count) for i := 0; i < count; i++ { j := i appendInterceptorRecords = append(appendInterceptorRecords, record{}) diff --git a/internal/streamingnode/server/wal/interceptors/ddl/builder.go b/internal/streamingnode/server/wal/interceptors/ddl/builder.go index 1164796313abb..d07ed3aed3abc 100644 --- a/internal/streamingnode/server/wal/interceptors/ddl/builder.go +++ b/internal/streamingnode/server/wal/interceptors/ddl/builder.go @@ -31,7 +31,7 @@ func NewInterceptorBuilder() interceptors.InterceptorBuilder { type interceptorBuilder struct{} // Build implements Builder. -func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.BasicInterceptor { +func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.Interceptor { interceptor := &ddlAppendInterceptor{ wal: param.WAL, } diff --git a/internal/streamingnode/server/wal/interceptors/ddl/ddl_interceptor.go b/internal/streamingnode/server/wal/interceptors/ddl/ddl_interceptor.go index 10d989438caff..8f2a17b6277ec 100644 --- a/internal/streamingnode/server/wal/interceptors/ddl/ddl_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/ddl/ddl_interceptor.go @@ -26,7 +26,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/syncutil" ) -var _ interceptors.AppendInterceptor = (*ddlAppendInterceptor)(nil) +var _ interceptors.Interceptor = (*ddlAppendInterceptor)(nil) // ddlAppendInterceptor is an append interceptor. type ddlAppendInterceptor struct { diff --git a/internal/streamingnode/server/wal/interceptors/interceptor.go b/internal/streamingnode/server/wal/interceptors/interceptor.go index 4f9bcbb714c5b..089189026fd28 100644 --- a/internal/streamingnode/server/wal/interceptors/interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/interceptor.go @@ -25,33 +25,28 @@ type InterceptorBuildParam struct { type InterceptorBuilder interface { // Build build a interceptor with wal that interceptor will work on. // the wal object will be sent to the interceptor builder when the wal is constructed with all interceptors. - Build(param InterceptorBuildParam) BasicInterceptor -} - -type BasicInterceptor interface { - // Close the interceptor release the resources. - Close() + Build(param InterceptorBuildParam) Interceptor } type Interceptor interface { - AppendInterceptor - - BasicInterceptor -} - -// AppendInterceptor is the interceptor for Append functions. -// All wal extra operations should be done by these function, such as -// 1. time tick setup. -// 2. unique primary key filter and build. -// 3. index builder. -// 4. cache sync up. -// AppendInterceptor should be lazy initialized and fast execution. -type AppendInterceptor interface { + // AppendInterceptor is the interceptor for Append functions. + // All wal extra operations should be done by these function, such as + // 1. time tick setup. + // 2. unique primary key filter and build. + // 3. index builder. + // 4. cache sync up. + // AppendInterceptor should be lazy initialized and fast execution. // Execute the append operation with interceptor. DoAppend(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error) + + // Close the interceptor release the resources. + Close() } -type InterceptorReady interface { +// Some interceptor may need to wait for some resource to be ready or recovery process. +type InterceptorWithReady interface { + Interceptor + // Ready check if interceptor is ready. // Close of Interceptor would not notify the ready (closed interceptor is not ready). // So always apply timeout when waiting for ready. @@ -62,9 +57,9 @@ type InterceptorReady interface { Ready() <-chan struct{} } -// Some interceptor may need to wait for some resource to be ready or recovery process. -type InterceptorWithReady interface { +type InterceptorWithUnwrapMessageID interface { Interceptor - InterceptorReady + // UnwrapMessageID the message id from the append result. + UnwrapMessageID(*wal.AppendResult) } diff --git a/internal/streamingnode/server/wal/interceptors/segment/builder.go b/internal/streamingnode/server/wal/interceptors/segment/builder.go index 3b4f62dc2a26f..b2ef3cafa8407 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/builder.go +++ b/internal/streamingnode/server/wal/interceptors/segment/builder.go @@ -17,7 +17,7 @@ func NewInterceptorBuilder() interceptors.InterceptorBuilder { type interceptorBuilder struct{} -func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.BasicInterceptor { +func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.Interceptor { assignManager := syncutil.NewFuture[*manager.PChannelSegmentAllocManager]() ctx, cancel := context.WithCancel(context.Background()) segmentInterceptor := &segmentInterceptor{ diff --git a/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go b/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go index f544912db59b3..d0220e20359e4 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/segment/segment_assign_interceptor.go @@ -17,7 +17,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var _ interceptors.AppendInterceptor = (*segmentInterceptor)(nil) +var _ interceptors.InterceptorWithReady = (*segmentInterceptor)(nil) // segmentInterceptor is the implementation of segment assignment interceptor. type segmentInterceptor struct { @@ -161,9 +161,12 @@ func (impl *segmentInterceptor) handleInsertMessage(ctx context.Context, msg mes // Close closes the segment interceptor. func (impl *segmentInterceptor) Close() { - // unregister the pchannels - resource.Resource().SegmentSealedInspector().UnregisterPChannelManager(impl.assignManager.Get()) - impl.assignManager.Get().Close(context.Background()) + assignManager := impl.assignManager.Get() + if assignManager != nil { + // unregister the pchannels + resource.Resource().SegmentSealedInspector().UnregisterPChannelManager(assignManager) + assignManager.Close(context.Background()) + } } // recoverPChannelManager recovers PChannel Assignment Manager. @@ -185,6 +188,7 @@ func (impl *segmentInterceptor) recoverPChannelManager(param interceptors.Interc select { case <-impl.ctx.Done(): impl.logger.Info("segment interceptor has been closed", zap.Error(impl.ctx.Err())) + impl.assignManager.Set(nil) return case <-ch: continue diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_details.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_details.go new file mode 100644 index 0000000000000..7151f443c0137 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_details.go @@ -0,0 +1,87 @@ +package ack + +import ( + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +// details that sorted by timestamp. +type sortedDetails []*AckDetail + +// NewAckDetails creates a new AckDetails. +func NewAckDetails() *AckDetails { + return &AckDetails{ + detail: make([]*AckDetail, 0), + } +} + +// AckDetails records the information of AckDetail. +// Used to analyze the all acknowledged details. +// TODO: add more analysis methods. e.g. such as counter function with filter. +type AckDetails struct { + detail []*AckDetail +} + +// AddDetails adds details to AckDetails. +// The input details must be sorted by timestamp. +func (ad *AckDetails) AddDetails(details sortedDetails) { + if len(details) == 0 { + return + } + if len(ad.detail) == 0 { + ad.detail = details + return + } + if ad.detail[len(ad.detail)-1].Timestamp >= details[0].Timestamp { + panic("unreachable: the details must be sorted by timestamp") + } + ad.detail = append(ad.detail, details...) +} + +// Empty returns true if the AckDetails is empty. +func (ad *AckDetails) Empty() bool { + return len(ad.detail) == 0 +} + +// Len returns the count of AckDetail. +func (ad *AckDetails) Len() int { + return len(ad.detail) +} + +// IsNoPersistedMessage returns true if no persisted message. +func (ad *AckDetails) IsNoPersistedMessage() bool { + for _, detail := range ad.detail { + // only sync message do not persist. + // it just sync up the timetick with rootcoord + if !detail.IsSync { + return false + } + } + return true +} + +// LastAllAcknowledgedTimestamp returns the last timestamp which all timestamps before it have been acknowledged. +// panic if no timestamp has been acknowledged. +func (ad *AckDetails) LastAllAcknowledgedTimestamp() uint64 { + return ad.detail[len(ad.detail)-1].Timestamp +} + +// EarliestLastConfirmedMessageID returns the last confirmed message id. +func (ad *AckDetails) EarliestLastConfirmedMessageID() message.MessageID { + // use the earliest last confirmed message id. + var msgID message.MessageID + for _, detail := range ad.detail { + if msgID == nil { + msgID = detail.LastConfirmedMessageID + continue + } + if detail.LastConfirmedMessageID != nil && detail.LastConfirmedMessageID.LT(msgID) { + msgID = detail.LastConfirmedMessageID + } + } + return msgID +} + +// Clear clears the AckDetails. +func (ad *AckDetails) Clear() { + ad.detail = nil +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_details_test.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_details_test.go new file mode 100644 index 0000000000000..eb6f2d2fd2803 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_details_test.go @@ -0,0 +1,37 @@ +package ack + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" +) + +func TestAckDetails(t *testing.T) { + details := NewAckDetails() + assert.True(t, details.Empty()) + assert.Equal(t, 0, details.Len()) + details.AddDetails(sortedDetails{ + &AckDetail{Timestamp: 1, IsSync: true}, + }) + assert.True(t, details.IsNoPersistedMessage()) + assert.Equal(t, uint64(1), details.LastAllAcknowledgedTimestamp()) + details.AddDetails(sortedDetails{ + &AckDetail{Timestamp: 2, LastConfirmedMessageID: walimplstest.NewTestMessageID(2)}, + &AckDetail{Timestamp: 3, LastConfirmedMessageID: walimplstest.NewTestMessageID(1)}, + }) + assert.False(t, details.IsNoPersistedMessage()) + assert.Equal(t, uint64(3), details.LastAllAcknowledgedTimestamp()) + assert.True(t, details.EarliestLastConfirmedMessageID().EQ(walimplstest.NewTestMessageID(1))) + + assert.Panics(t, func() { + details.AddDetails(sortedDetails{ + &AckDetail{Timestamp: 1, IsSync: true}, + }) + }) + + details.Clear() + assert.True(t, details.Empty()) + assert.Equal(t, 0, details.Len()) +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go index 93ac1842a42be..55583d286c59e 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go @@ -62,12 +62,12 @@ func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail, } // popUntilLastAllAcknowledged pops the timestamps until the one that all timestamps before it have been acknowledged. -func (ta *AckManager) popUntilLastAllAcknowledged() []*AckDetail { +func (ta *AckManager) popUntilLastAllAcknowledged() sortedDetails { ta.mu.Lock() defer ta.mu.Unlock() // pop all acknowledged timestamps. - details := make([]*AckDetail, 0, 5) + details := make(sortedDetails, 0, 5) for ta.notAckHeap.Len() > 0 && ta.notAckHeap.Peek().acknowledged.Load() { ack := ta.notAckHeap.Pop() details = append(details, ack.ackDetail()) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack_details.go b/internal/streamingnode/server/wal/interceptors/timetick/ack_details.go deleted file mode 100644 index 85ef5646440b8..0000000000000 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack_details.go +++ /dev/null @@ -1,43 +0,0 @@ -package timetick - -import "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack" - -// ackDetails records the information of AckDetail. -// Used to analyze the ack details. -// TODO: add more analysis methods. e.g. such as counter function with filter. -type ackDetails struct { - detail []*ack.AckDetail -} - -// AddDetails adds details to AckDetails. -func (ad *ackDetails) AddDetails(details []*ack.AckDetail) { - if len(details) == 0 { - return - } - if len(ad.detail) == 0 { - ad.detail = details - return - } - ad.detail = append(ad.detail, details...) -} - -// Empty returns true if the AckDetails is empty. -func (ad *ackDetails) Empty() bool { - return len(ad.detail) == 0 -} - -// Len returns the count of AckDetail. -func (ad *ackDetails) Len() int { - return len(ad.detail) -} - -// LastAllAcknowledgedTimestamp returns the last timestamp which all timestamps before it have been acknowledged. -// panic if no timestamp has been acknowledged. -func (ad *ackDetails) LastAllAcknowledgedTimestamp() uint64 { - return ad.detail[len(ad.detail)-1].Timestamp -} - -// Clear clears the AckDetails. -func (ad *ackDetails) Clear() { - ad.detail = nil -} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/builder.go b/internal/streamingnode/server/wal/interceptors/timetick/builder.go index 7f7ce3c41a284..0e5d060e61097 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/builder.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/builder.go @@ -1,12 +1,8 @@ package timetick import ( - "context" - "time" - + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack" - "github.com/milvus-io/milvus/pkg/util/paramtable" ) var _ interceptors.InterceptorBuilder = (*interceptorBuilder)(nil) @@ -22,20 +18,12 @@ func NewInterceptorBuilder() interceptors.InterceptorBuilder { type interceptorBuilder struct{} // Build implements Builder. -func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.BasicInterceptor { - ctx, cancel := context.WithCancel(context.Background()) - interceptor := &timeTickAppendInterceptor{ - ctx: ctx, - cancel: cancel, - ready: make(chan struct{}), - ackManager: ack.NewAckManager(), - ackDetails: &ackDetails{}, - sourceID: paramtable.GetNodeID(), +func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.Interceptor { + operator := newTimeTickSyncOperator(param) + // initialize operation can be async to avoid block the build operation. + go operator.initialize() + resource.Resource().TimeTickInspector().RegisterSyncOperator(operator) + return &timeTickAppendInterceptor{ + operator: operator, } - go interceptor.executeSyncTimeTick( - // TODO: move the configuration to streamingnode. - paramtable.Get().ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond), - param, - ) - return interceptor } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go new file mode 100644 index 0000000000000..ee7463934b30a --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go @@ -0,0 +1,87 @@ +package inspector + +import ( + "time" + + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// NewTimeTickSyncInspector creates a new time tick sync inspector. +func NewTimeTickSyncInspector() TimeTickSyncInspector { + inspector := &timeTickSyncInspectorImpl{ + taskNotifier: syncutil.NewAsyncTaskNotifier[struct{}](), + syncNotifier: newSyncNotifier(), + operators: typeutil.NewConcurrentMap[string, TimeTickSyncOperator](), + } + go inspector.background() + return inspector +} + +type timeTickSyncInspectorImpl struct { + taskNotifier *syncutil.AsyncTaskNotifier[struct{}] + syncNotifier *syncNotifier + operators *typeutil.ConcurrentMap[string, TimeTickSyncOperator] +} + +func (s *timeTickSyncInspectorImpl) TriggerSync(pChannelInfo types.PChannelInfo) { + s.syncNotifier.AddAndNotify(pChannelInfo) +} + +// GetOperator gets the operator by pchannel info. +func (s *timeTickSyncInspectorImpl) MustGetOperator(pChannelInfo types.PChannelInfo) TimeTickSyncOperator { + operator, ok := s.operators.Get(pChannelInfo.Name) + if !ok { + panic("sync operator not found, critical bug in code") + } + return operator +} + +// RegisterSyncOperator registers a sync operator. +func (s *timeTickSyncInspectorImpl) RegisterSyncOperator(operator TimeTickSyncOperator) { + _, loaded := s.operators.GetOrInsert(operator.Channel().Name, operator) + if loaded { + panic("sync operator already exists, critical bug in code") + } +} + +// UnregisterSyncOperator unregisters a sync operator. +func (s *timeTickSyncInspectorImpl) UnregisterSyncOperator(operator TimeTickSyncOperator) { + _, loaded := s.operators.GetAndRemove(operator.Channel().Name) + if !loaded { + panic("sync operator not found, critical bug in code") + } +} + +// background executes the time tick sync inspector. +func (s *timeTickSyncInspectorImpl) background() { + defer s.taskNotifier.Finish(struct{}{}) + + interval := paramtable.Get().ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond) + ticker := time.NewTicker(interval) + for { + select { + case <-s.taskNotifier.Context().Done(): + return + case <-ticker.C: + s.operators.Range(func(_ string, operator TimeTickSyncOperator) bool { + operator.Sync(s.taskNotifier.Context()) + return true + }) + case <-s.syncNotifier.WaitChan(): + s.syncNotifier.Get().Range(func(pchannel types.PChannelInfo) bool { + if operator, ok := s.operators.Get(pchannel.Name); ok { + operator.Sync(s.taskNotifier.Context()) + } + return true + }) + } + } +} + +func (s *timeTickSyncInspectorImpl) Close() { + s.taskNotifier.Cancel() + s.taskNotifier.BlockUntilFinish() +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go new file mode 100644 index 0000000000000..b726748092b00 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go @@ -0,0 +1,37 @@ +package inspector + +import ( + "context" + + "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +type TimeTickSyncOperator interface { + TimeTickNotifier() *TimeTickNotifier + + // Channel returns the pchannel info. + Channel() types.PChannelInfo + + // Sync trigger a sync operation, try to send the timetick message into wal. + // Sync operation is a blocking operation, and not thread-safe, will only call in one goroutine. + Sync(ctx context.Context) +} + +// TimeTickSyncInspector is the inspector to sync time tick. +type TimeTickSyncInspector interface { + // TriggerSync adds a pchannel info and notify the sync operation. + // manually trigger the sync operation of pchannel. + TriggerSync(pChannelInfo types.PChannelInfo) + + // RegisterSyncOperator registers a sync operator. + RegisterSyncOperator(operator TimeTickSyncOperator) + + // MustGetOperator gets the operator by pchannel info, otherwise panic. + MustGetOperator(types.PChannelInfo) TimeTickSyncOperator + + // UnregisterSyncOperator unregisters a sync operator. + UnregisterSyncOperator(operator TimeTickSyncOperator) + + // Close closes the inspector. + Close() +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector_test.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector_test.go new file mode 100644 index 0000000000000..75176ab3c4ec4 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector_test.go @@ -0,0 +1,46 @@ +package inspector_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestInsepctor(t *testing.T) { + paramtable.Init() + + i := inspector.NewTimeTickSyncInspector() + operator := mock_inspector.NewMockTimeTickSyncOperator(t) + pchannel := types.PChannelInfo{ + Name: "test", + Term: 1, + } + operator.EXPECT().Channel().Return(pchannel) + operator.EXPECT().Sync(mock.Anything).Run(func(ctx context.Context) {}) + + i.RegisterSyncOperator(operator) + assert.Panics(t, func() { + i.RegisterSyncOperator(operator) + }) + i.TriggerSync(pchannel) + o := i.MustGetOperator(pchannel) + assert.NotNil(t, o) + time.Sleep(250 * time.Millisecond) + i.UnregisterSyncOperator(operator) + + assert.Panics(t, func() { + i.UnregisterSyncOperator(operator) + }) + assert.Panics(t, func() { + i.MustGetOperator(pchannel) + }) + i.Close() +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go new file mode 100644 index 0000000000000..66c1b66ff5d93 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go @@ -0,0 +1,123 @@ +package inspector + +import ( + "sync" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// newSyncNotifier creates a new sync notifier. +func newSyncNotifier() *syncNotifier { + return &syncNotifier{ + cond: syncutil.NewContextCond(&sync.Mutex{}), + signal: typeutil.NewSet[types.PChannelInfo](), + } +} + +// syncNotifier is a notifier for sync signal. +type syncNotifier struct { + cond *syncutil.ContextCond + signal typeutil.Set[types.PChannelInfo] +} + +// AddAndNotify adds a signal and notifies the waiter. +func (n *syncNotifier) AddAndNotify(pChannelInfo types.PChannelInfo) { + n.cond.LockAndBroadcast() + n.signal.Insert(pChannelInfo) + n.cond.L.Unlock() +} + +// WaitChan returns the wait channel. +func (n *syncNotifier) WaitChan() <-chan struct{} { + n.cond.L.Lock() + if n.signal.Len() > 0 { + n.cond.L.Unlock() + ch := make(chan struct{}) + close(ch) + return ch + } + return n.cond.WaitChan() +} + +// Get gets the signal. +func (n *syncNotifier) Get() typeutil.Set[types.PChannelInfo] { + n.cond.L.Lock() + signal := n.signal + n.signal = typeutil.NewSet[types.PChannelInfo]() + n.cond.L.Unlock() + return signal +} + +// TimeTickInfo records the information of time tick. +type TimeTickInfo struct { + MessageID message.MessageID // the message id. + TimeTick uint64 // the time tick. + LastConfirmedMessageID message.MessageID // the last confirmed message id. + // The time tick may be updated, without last timetickMessage +} + +// IsZero returns true if the time tick info is zero. +func (t *TimeTickInfo) IsZero() bool { + return t.TimeTick == 0 +} + +// NewTimeTickNotifier creates a new time tick info listener. +func NewTimeTickNotifier() *TimeTickNotifier { + return &TimeTickNotifier{ + cond: syncutil.NewContextCond(&sync.Mutex{}), + info: TimeTickInfo{}, + } +} + +// TimeTickNotifier is a listener for time tick info. +type TimeTickNotifier struct { + cond *syncutil.ContextCond + info TimeTickInfo +} + +// Update only update the time tick info, but not notify the waiter. +func (l *TimeTickNotifier) Update(info TimeTickInfo) { + l.cond.L.Lock() + if l.info.IsZero() || l.info.MessageID.LT(info.MessageID) { + l.info = info + } + l.cond.L.Unlock() +} + +// OnlyUpdateTs only updates the time tick, and notify the waiter. +func (l *TimeTickNotifier) OnlyUpdateTs(timetick uint64) { + l.cond.LockAndBroadcast() + if !l.info.IsZero() && l.info.TimeTick < timetick { + l.info.TimeTick = timetick + } + l.cond.L.Unlock() +} + +// WatchAtMessageID watch the message id. +// If the message id is not equal to the last message id, return nil channel. +// Or if the time tick is less than the last time tick, return channel. +func (l *TimeTickNotifier) WatchAtMessageID(messageID message.MessageID, ts uint64) <-chan struct{} { + l.cond.L.Lock() + if l.info.IsZero() || !l.info.MessageID.EQ(messageID) { + l.cond.L.Unlock() + return nil + } + if ts < l.info.TimeTick { + ch := make(chan struct{}) + close(ch) + l.cond.L.Unlock() + return ch + } + return l.cond.WaitChan() +} + +// Get gets the time tick info. +func (l *TimeTickNotifier) Get() TimeTickInfo { + l.cond.L.Lock() + info := l.info + l.cond.L.Unlock() + return info +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go new file mode 100644 index 0000000000000..51a0ddc439bee --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go @@ -0,0 +1,75 @@ +package inspector + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" +) + +func TestSyncNotifier(t *testing.T) { + n := newSyncNotifier() + ch := n.WaitChan() + assert.True(t, n.Get().Len() == 0) + + shouldBeBlocked(ch) + + n.AddAndNotify(types.PChannelInfo{ + Name: "test", + Term: 1, + }) + // should not block + <-ch + assert.True(t, n.Get().Len() == 1) + assert.True(t, n.Get().Len() == 0) + + n.AddAndNotify(types.PChannelInfo{ + Name: "test", + Term: 1, + }) + ch = n.WaitChan() + <-ch +} + +func shouldBeBlocked(ch <-chan struct{}) { + select { + case <-ch: + panic("should block") + default: + } +} + +func TestTimeTickNotifier(t *testing.T) { + n := NewTimeTickNotifier() + info := n.Get() + assert.True(t, info.IsZero()) + msgID := walimplstest.NewTestMessageID(1) + assert.Nil(t, n.WatchAtMessageID(msgID, 0)) + n.Update(TimeTickInfo{ + MessageID: msgID, + TimeTick: 2, + LastConfirmedMessageID: walimplstest.NewTestMessageID(0), + }) + + ch := n.WatchAtMessageID(msgID, 0) + assert.NotNil(t, ch) + <-ch // should not block. + + ch = n.WatchAtMessageID(msgID, 2) + assert.NotNil(t, ch) + shouldBeBlocked(ch) // should block. + + n.OnlyUpdateTs(3) + <-ch // should not block. + info = n.Get() + assert.Equal(t, uint64(3), info.TimeTick) + + ch = n.WatchAtMessageID(msgID, 3) + n.Update(TimeTickInfo{ + MessageID: walimplstest.NewTestMessageID(3), + TimeTick: 4, + }) + shouldBeBlocked(ch) +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go index e3a2cbccd0806..a081341e38ec4 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go @@ -2,169 +2,82 @@ package timetick import ( "context" - "time" "github.com/cockroachdb/errors" - "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/message" - "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) -var _ interceptors.AppendInterceptor = (*timeTickAppendInterceptor)(nil) +var ( + _ interceptors.InterceptorWithReady = (*timeTickAppendInterceptor)(nil) + _ interceptors.InterceptorWithUnwrapMessageID = (*timeTickAppendInterceptor)(nil) +) // timeTickAppendInterceptor is a append interceptor. type timeTickAppendInterceptor struct { - ctx context.Context - cancel context.CancelFunc - ready chan struct{} - - ackManager *ack.AckManager - ackDetails *ackDetails - sourceID int64 + operator *timeTickSyncOperator } // Ready implements AppendInterceptor. func (impl *timeTickAppendInterceptor) Ready() <-chan struct{} { - return impl.ready + return impl.operator.Ready() } // Do implements AppendInterceptor. -func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) { +func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (message.MessageID, error) { + var timetick uint64 + var msgID message.MessageID + var err error if msg.MessageType() != message.MessageTypeTimeTick { // Allocate new acker for message. var acker *ack.Acker - if acker, err = impl.ackManager.Allocate(ctx); err != nil { + var err error + if acker, err = impl.operator.AckManager().Allocate(ctx); err != nil { return nil, errors.Wrap(err, "allocate timestamp failed") } defer func() { acker.Ack(ack.OptError(err)) - impl.ackManager.AdvanceLastConfirmedMessageID(msgID) + impl.operator.AckManager().AdvanceLastConfirmedMessageID(msgID) }() // Assign timestamp to message and call append method. msg = msg. WithTimeTick(acker.Timestamp()). // message assigned with these timetick. WithLastConfirmed(acker.LastConfirmedMessageID()) // start consuming from these message id, the message which timetick greater than current timetick will never be lost. + timetick = acker.Timestamp() + } else { + timetick = msg.TimeTick() } - return append(ctx, msg) -} - -// Close implements AppendInterceptor. -func (impl *timeTickAppendInterceptor) Close() { - impl.cancel() -} -// execute start a background task. -func (impl *timeTickAppendInterceptor) executeSyncTimeTick(interval time.Duration, param interceptors.InterceptorBuildParam) { - underlyingWALImpls := param.WALImpls - - logger := log.With(zap.Any("channel", underlyingWALImpls.Channel())) - logger.Info("start to sync time tick...") - defer logger.Info("sync time tick stopped") - - if err := impl.blockUntilSyncTimeTickReady(underlyingWALImpls); err != nil { - logger.Warn("sync first time tick failed", zap.Error(err)) - return + // append the message into wal. + if msgID, err = append(ctx, msg); err != nil { + return nil, err } - // interceptor is ready, wait for the final wal object is ready to use. - wal := param.WAL.Get() - - // TODO: sync time tick message to wal periodically. - // Add a trigger on `AckManager` to sync time tick message without periodically. - // `AckManager` gather detail information, time tick sync can check it and make the message between tt more smaller. - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-impl.ctx.Done(): - return - case <-ticker.C: - if err := impl.sendTsMsg(impl.ctx, wal.Append); err != nil { - log.Warn("send time tick sync message failed", zap.Error(err)) - } - } - } + // wrap message id with timetick. + return wrapMessageIDWithTimeTick{ + MessageID: msgID, + timetick: timetick, + }, nil } -// blockUntilSyncTimeTickReady blocks until the first time tick message is sent. -func (impl *timeTickAppendInterceptor) blockUntilSyncTimeTickReady(underlyingWALImpls walimpls.WALImpls) error { - logger := log.With(zap.Any("channel", underlyingWALImpls.Channel())) - logger.Info("start to sync first time tick") - defer logger.Info("sync first time tick done") - - // Send first timetick message to wal before interceptor is ready. - for count := 0; ; count++ { - // Sent first timetick message to wal before ready. - // New TT is always greater than all tt on previous streamingnode. - // A fencing operation of underlying WAL is needed to make exclusive produce of topic. - // Otherwise, the TT principle may be violated. - // And sendTsMsg must be done, to help ackManager to get first LastConfirmedMessageID - // !!! Send a timetick message into walimpls directly is safe. - select { - case <-impl.ctx.Done(): - return impl.ctx.Err() - default: - } - if err := impl.sendTsMsg(impl.ctx, underlyingWALImpls.Append); err != nil { - logger.Warn("send first timestamp message failed", zap.Error(err), zap.Int("retryCount", count)) - // TODO: exponential backoff. - time.Sleep(50 * time.Millisecond) - continue - } - break - } - // interceptor is ready now. - close(impl.ready) - return nil +// Close implements AppendInterceptor. +func (impl *timeTickAppendInterceptor) Close() { + resource.Resource().TimeTickInspector().UnregisterSyncOperator(impl.operator) + impl.operator.Close() } -// syncAcknowledgedDetails syncs the timestamp acknowledged details. -func (impl *timeTickAppendInterceptor) syncAcknowledgedDetails() { - // Sync up and get last confirmed timestamp. - ackDetails, err := impl.ackManager.SyncAndGetAcknowledged(impl.ctx) - if err != nil { - log.Warn("sync timestamp ack manager failed", zap.Error(err)) - } - - // Add ack details to ackDetails. - impl.ackDetails.AddDetails(ackDetails) +func (impl *timeTickAppendInterceptor) UnwrapMessageID(r *wal.AppendResult) { + m := r.MessageID.(wrapMessageIDWithTimeTick) + r.MessageID = m.MessageID + r.TimeTick = m.timetick } -// sendTsMsg sends first timestamp message to wal. -// TODO: TT lag warning. -func (impl *timeTickAppendInterceptor) sendTsMsg(_ context.Context, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)) error { - // Sync the timestamp acknowledged details. - impl.syncAcknowledgedDetails() - - if impl.ackDetails.Empty() { - // No acknowledged info can be sent. - // Some message sent operation is blocked, new TT cannot be pushed forward. - return nil - } - - // Construct time tick message. - msg, err := newTimeTickMsg(impl.ackDetails.LastAllAcknowledgedTimestamp(), impl.sourceID) - if err != nil { - return errors.Wrap(err, "at build time tick msg") - } - - // Append it to wal. - msgID, err := appender(impl.ctx, msg) - if err != nil { - return errors.Wrapf(err, - "append time tick msg to wal failed, timestamp: %d, previous message counter: %d", - impl.ackDetails.LastAllAcknowledgedTimestamp(), - impl.ackDetails.Len(), - ) - } - - // Ack details has been committed to wal, clear it. - impl.ackDetails.Clear() - impl.ackManager.AdvanceLastConfirmedMessageID(msgID) - return nil +type wrapMessageIDWithTimeTick struct { + message.MessageID + timetick uint64 } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_message.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_message.go index bbdc62b7ab811..2ed586859fcb1 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_message.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_message.go @@ -7,7 +7,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" ) -func newTimeTickMsg(ts uint64, sourceID int64) (message.MutableMessage, error) { +func NewTimeTickMsg(ts uint64, lastConfirmedMessageID message.MessageID, sourceID int64) (message.MutableMessage, error) { // TODO: time tick should be put on properties, for compatibility, we put it on message body now. // Common message's time tick is set on interceptor. // TimeTickMsg's time tick should be set here. @@ -26,5 +26,8 @@ func newTimeTickMsg(ts uint64, sourceID int64) (message.MutableMessage, error) { if err != nil { return nil, err } - return msg.WithTimeTick(ts), nil + if lastConfirmedMessageID != nil { + return msg.WithTimeTick(ts).WithLastConfirmed(lastConfirmedMessageID), nil + } + return msg.WithTimeTick(ts).WithLastConfirmedUseMessageID(), nil } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go new file mode 100644 index 0000000000000..d3e3032c01872 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go @@ -0,0 +1,250 @@ +package timetick + +import ( + "context" + "time" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// timeTickSyncOperator is a time tick sync operator. +var _ inspector.TimeTickSyncOperator = &timeTickSyncOperator{} + +// NewTimeTickSyncOperator creates a new time tick sync operator. +func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTickSyncOperator { + ctx, cancel := context.WithCancel(context.Background()) + return &timeTickSyncOperator{ + ctx: ctx, + cancel: cancel, + logger: log.With(zap.Any("pchannel", param.WALImpls.Channel())), + pchannel: param.WALImpls.Channel(), + ready: make(chan struct{}), + interceptorBuildParam: param, + ackManager: ack.NewAckManager(), + ackDetails: ack.NewAckDetails(), + sourceID: paramtable.GetNodeID(), + timeTickNotifier: inspector.NewTimeTickNotifier(), + } +} + +// timeTickSyncOperator is a time tick sync operator. +type timeTickSyncOperator struct { + ctx context.Context + cancel context.CancelFunc + + logger *log.MLogger + pchannel types.PChannelInfo // pchannel info belong to. + ready chan struct{} // hint the time tick operator is ready to use. + interceptorBuildParam interceptors.InterceptorBuildParam // interceptor build param. + ackManager *ack.AckManager // ack manager. + ackDetails *ack.AckDetails // all acknowledged details, all acked messages but not sent to wal will be kept here. + sourceID int64 // the current node id. + timeTickNotifier *inspector.TimeTickNotifier // used to notify the time tick change. +} + +// Channel returns the pchannel info. +func (impl *timeTickSyncOperator) Channel() types.PChannelInfo { + return impl.pchannel +} + +// TimeTickNotifier returns the time tick notifier. +func (impl *timeTickSyncOperator) TimeTickNotifier() *inspector.TimeTickNotifier { + return impl.timeTickNotifier +} + +// Sync trigger a sync operation. +// Sync operation is not thread safe, so call it in a single goroutine. +func (impl *timeTickSyncOperator) Sync(ctx context.Context) { + // Sync operation cannot trigger until isReady. + if !impl.isReady() { + return + } + + wal := impl.interceptorBuildParam.WAL.Get() + err := impl.sendTsMsg(ctx, func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + appendResult, err := wal.Append(ctx, msg) + if err != nil { + return nil, err + } + return appendResult.MessageID, nil + }) + if err != nil { + impl.logger.Warn("send time tick sync message failed", zap.Error(err)) + } +} + +// initialize initializes the time tick sync operator. +func (impl *timeTickSyncOperator) initialize() { + impl.blockUntilSyncTimeTickReady() +} + +// blockUntilSyncTimeTickReady blocks until the first time tick message is sent. +func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error { + underlyingWALImpls := impl.interceptorBuildParam.WALImpls + + impl.logger.Info("start to sync first time tick") + defer impl.logger.Info("sync first time tick done") + + backoffTimer := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{ + Default: 5 * time.Second, + Backoff: typeutil.BackoffConfig{ + InitialInterval: 20 * time.Millisecond, + Multiplier: 2.0, + MaxInterval: 5 * time.Second, + }, + }) + backoffTimer.EnableBackoff() + + var lastErr error + // Send first timetick message to wal before interceptor is ready. + for count := 0; ; count++ { + if count > 0 { + nextTimer, nextBalanceInterval := backoffTimer.NextTimer() + impl.logger.Warn( + "send first time tick failed", + zap.Duration("nextBalanceInterval", nextBalanceInterval), + zap.Int("retryCount", count), + zap.Error(lastErr), + ) + select { + case <-impl.ctx.Done(): + return impl.ctx.Err() + case <-nextTimer: + } + } + + // Sent first timetick message to wal before ready. + // New TT is always greater than all tt on previous streamingnode. + // A fencing operation of underlying WAL is needed to make exclusive produce of topic. + // Otherwise, the TT principle may be violated. + // And sendTsMsg must be done, to help ackManager to get first LastConfirmedMessageID + // !!! Send a timetick message into walimpls directly is safe. + resource.Resource().TSOAllocator().Sync() + ts, err := resource.Resource().TSOAllocator().Allocate(impl.ctx) + if err != nil { + lastErr = errors.Wrap(err, "allocate timestamp failed") + continue + } + if err := impl.sendPersistentTsMsg(impl.ctx, ts, nil, underlyingWALImpls.Append); err != nil { + lastErr = errors.Wrap(err, "send first timestamp message failed") + continue + } + break + } + // interceptor is ready now. + close(impl.ready) + return nil +} + +// Ready implements AppendInterceptor. +func (impl *timeTickSyncOperator) Ready() <-chan struct{} { + return impl.ready +} + +// isReady returns true if the operator is ready. +func (impl *timeTickSyncOperator) isReady() bool { + select { + case <-impl.ready: + return true + default: + return false + } +} + +// AckManager returns the ack manager. +func (impl *timeTickSyncOperator) AckManager() *ack.AckManager { + return impl.ackManager +} + +// Close close the time tick sync operator. +func (impl *timeTickSyncOperator) Close() { + impl.cancel() +} + +// sendTsMsg sends first timestamp message to wal. +// TODO: TT lag warning. +func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)) error { + // Sync the timestamp acknowledged details. + impl.syncAcknowledgedDetails(ctx) + + if impl.ackDetails.Empty() { + // No acknowledged info can be sent. + // Some message sent operation is blocked, new TT cannot be pushed forward. + return nil + } + + // Construct time tick message. + ts := impl.ackDetails.LastAllAcknowledgedTimestamp() + lastConfirmedMessageID := impl.ackDetails.EarliestLastConfirmedMessageID() + + if impl.ackDetails.IsNoPersistedMessage() { + // there's no persisted message, so no need to send persistent time tick message. + // only update it to notify the scanner. + return impl.notifyNoPersistentTsMsg(ts) + } + // otherwise, send persistent time tick message. + return impl.sendPersistentTsMsg(ctx, ts, lastConfirmedMessageID, appender) +} + +// sendPersistentTsMsg sends persistent time tick message to wal. +func (impl *timeTickSyncOperator) sendPersistentTsMsg(ctx context.Context, + ts uint64, + lastConfirmedMessageID message.MessageID, + appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error), +) error { + msg, err := NewTimeTickMsg(ts, lastConfirmedMessageID, impl.sourceID) + if err != nil { + return errors.Wrap(err, "at build time tick msg") + } + + // Append it to wal. + msgID, err := appender(ctx, msg) + if err != nil { + return errors.Wrapf(err, + "append time tick msg to wal failed, timestamp: %d, previous message counter: %d", + impl.ackDetails.LastAllAcknowledgedTimestamp(), + impl.ackDetails.Len(), + ) + } + + // Ack details has been committed to wal, clear it. + impl.ackDetails.Clear() + // Update last confirmed message id, so that the ack manager can use it for next time tick ack allocation. + impl.ackManager.AdvanceLastConfirmedMessageID(msgID) + // Update last time tick message id and time tick. + impl.timeTickNotifier.Update(inspector.TimeTickInfo{ + MessageID: msgID, + TimeTick: ts, + }) + return nil +} + +// notifyNoPersistentTsMsg sends no persistent time tick message. +func (impl *timeTickSyncOperator) notifyNoPersistentTsMsg(ts uint64) error { + impl.ackDetails.Clear() + impl.timeTickNotifier.OnlyUpdateTs(ts) + return nil +} + +// syncAcknowledgedDetails syncs the timestamp acknowledged details. +func (impl *timeTickSyncOperator) syncAcknowledgedDetails(ctx context.Context) { + // Sync up and get last confirmed timestamp. + ackDetails, err := impl.ackManager.SyncAndGetAcknowledged(ctx) + if err != nil { + impl.logger.Warn("sync timestamp ack manager failed", zap.Error(err)) + } + + // Add ack details to ackDetails. + impl.ackDetails.AddDetails(ackDetails) +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go new file mode 100644 index 0000000000000..a428717811aad --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go @@ -0,0 +1,100 @@ +package timetick + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" + "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/syncutil" +) + +func TestTimeTickSyncOperator(t *testing.T) { + paramtable.Init() + resource.InitForTest(t) + + walFuture := syncutil.NewFuture[wal.WAL]() + msgID := walimplstest.NewTestMessageID(1) + wimpls := mock_walimpls.NewMockWALImpls(t) + wimpls.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { + return msgID, nil + }) + wimpls.EXPECT().Channel().Return(types.PChannelInfo{ + Name: "test", + Term: 1, + }) + param := interceptors.InterceptorBuildParam{ + WALImpls: wimpls, + WAL: walFuture, + } + operator := newTimeTickSyncOperator(param) + + assert.Equal(t, "test", operator.Channel().Name) + + defer operator.Close() + + // Test the initialize. + shouldBlock(operator.Ready()) + // after initialize, the operator should be ready, and setup the walFuture. + operator.initialize() + <-operator.Ready() + l := mock_wal.NewMockWAL(t) + walFuture.Set(l) + + // Test the sync operation, but there is no message to sync. + ctx := context.Background() + ts, err := resource.Resource().TSOAllocator().Allocate(ctx) + assert.NoError(t, err) + ch := operator.TimeTickNotifier().WatchAtMessageID(msgID, ts) + shouldBlock(ch) + // should not trigger any wal operation, but only update the timetick. + operator.Sync(ctx) + // should not block because timetick updates. + <-ch + + // Test alloc a real message but not ack. + // because the timetick message id is updated, so the old watcher should be invalidated. + ch = operator.TimeTickNotifier().WatchAtMessageID(msgID, operator.TimeTickNotifier().Get().TimeTick) + shouldBlock(ch) + acker, err := operator.AckManager().Allocate(ctx) + assert.NoError(t, err) + // should block timetick notifier. + ts, _ = resource.Resource().TSOAllocator().Allocate(ctx) + ch = operator.TimeTickNotifier().WatchAtMessageID(walimplstest.NewTestMessageID(2), ts) + shouldBlock(ch) + // sync operation just do nothing, so there's no wal operation triggered. + operator.Sync(ctx) + + // After ack, a wal operation will be trigger. + acker.Ack() + l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) { + ts, _ := resource.Resource().TSOAllocator().Allocate(ctx) + return &types.AppendResult{ + MessageID: walimplstest.NewTestMessageID(2), + TimeTick: ts, + }, nil + }) + // should trigger a wal operation. + operator.Sync(ctx) + // ch should still be blocked, because the timetick message id is updated, old message id watch is not notified. + shouldBlock(ch) +} + +func shouldBlock(ch <-chan struct{}) { + select { + case <-ch: + panic("should block") + case <-time.After(10 * time.Millisecond): + } +} diff --git a/internal/streamingnode/server/wal/scanner.go b/internal/streamingnode/server/wal/scanner.go index 8a5ed86464123..db80346355e80 100644 --- a/internal/streamingnode/server/wal/scanner.go +++ b/internal/streamingnode/server/wal/scanner.go @@ -45,17 +45,25 @@ type Scanner interface { Close() error } +type HandleParam struct { + Ctx context.Context + Upstream <-chan message.ImmutableMessage + Message message.ImmutableMessage + TimeTickChan <-chan struct{} +} + +type HandleResult struct { + Incoming message.ImmutableMessage // Not nil if upstream return new message. + MessageHandled bool // True if Message is handled successfully. + TimeTickUpdated bool // True if TimeTickChan is triggered. + Error error // Error is context is canceled. +} + // MessageHandler is used to handle message read from log. // TODO: should be removed in future after msgstream is removed. type MessageHandler interface { // Handle is the callback for handling message. - // The message will be passed to the handler for processing. - // Handle operation can be blocked, but should listen to the context.Done() and upstream. - // If the context is canceled, the handler should return immediately with ctx.Err. - // If the upstream is closed, the handler should return immediately with ErrUpstreamClosed. - // If the upstream recv a message, the handler should return the incoming message. - // If the handler handle the message successfully, it should return the ok=true. - Handle(ctx context.Context, upstream <-chan message.ImmutableMessage, msg message.ImmutableMessage) (incoming message.ImmutableMessage, ok bool, err error) + Handle(param HandleParam) HandleResult // Close is called after all messages are handled or handling is interrupted. Close() diff --git a/internal/streamingnode/server/wal/wal.go b/internal/streamingnode/server/wal/wal.go index 7b63ea3250329..f878965fb57ad 100644 --- a/internal/streamingnode/server/wal/wal.go +++ b/internal/streamingnode/server/wal/wal.go @@ -7,6 +7,8 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" ) +type AppendResult = types.AppendResult + // WAL is the WAL framework interface. // !!! Don't implement it directly, implement walimpls.WAL instead. type WAL interface { @@ -16,10 +18,10 @@ type WAL interface { Channel() types.PChannelInfo // Append writes a record to the log. - Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + Append(ctx context.Context, msg message.MutableMessage) (*AppendResult, error) // Append a record to the log asynchronously. - AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) + AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*AppendResult, error)) // Read returns a scanner for reading records from the wal. Read(ctx context.Context, deliverPolicy ReadOption) (Scanner, error) diff --git a/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go index 5979b358e03cd..1006d9f33edae 100644 --- a/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go @@ -399,6 +399,49 @@ func (_c *MockMutableMessage_WithLastConfirmed_Call) RunAndReturn(run func(messa return _c } +// WithLastConfirmedUseMessageID provides a mock function with given fields: +func (_m *MockMutableMessage) WithLastConfirmedUseMessageID() message.MutableMessage { + ret := _m.Called() + + var r0 message.MutableMessage + if rf, ok := ret.Get(0).(func() message.MutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MutableMessage) + } + } + + return r0 +} + +// MockMutableMessage_WithLastConfirmedUseMessageID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WithLastConfirmedUseMessageID' +type MockMutableMessage_WithLastConfirmedUseMessageID_Call struct { + *mock.Call +} + +// WithLastConfirmedUseMessageID is a helper method to define mock.On call +func (_e *MockMutableMessage_Expecter) WithLastConfirmedUseMessageID() *MockMutableMessage_WithLastConfirmedUseMessageID_Call { + return &MockMutableMessage_WithLastConfirmedUseMessageID_Call{Call: _e.mock.On("WithLastConfirmedUseMessageID")} +} + +func (_c *MockMutableMessage_WithLastConfirmedUseMessageID_Call) Run(run func()) *MockMutableMessage_WithLastConfirmedUseMessageID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMutableMessage_WithLastConfirmedUseMessageID_Call) Return(_a0 message.MutableMessage) *MockMutableMessage_WithLastConfirmedUseMessageID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_WithLastConfirmedUseMessageID_Call) RunAndReturn(run func() message.MutableMessage) *MockMutableMessage_WithLastConfirmedUseMessageID_Call { + _c.Call.Return(run) + return _c +} + // WithTimeTick provides a mock function with given fields: tt func (_m *MockMutableMessage) WithTimeTick(tt uint64) message.MutableMessage { ret := _m.Called(tt) diff --git a/pkg/streaming/proto/streaming.proto b/pkg/streaming/proto/streaming.proto index 5f4ae5087d6c5..ad9f9b25ae5d2 100644 --- a/pkg/streaming/proto/streaming.proto +++ b/pkg/streaming/proto/streaming.proto @@ -156,6 +156,7 @@ message DeliverFilter { DeliverFilterTimeTickGT time_tick_gt = 1; DeliverFilterTimeTickGTE time_tick_gte = 2; DeliverFilterVChannel vchannel = 3; + DeliverFilterMessageType message_type = 4; } } @@ -178,6 +179,11 @@ message DeliverFilterVChannel { string vchannel = 1; // deliver message with vchannel name. } +message DeliverFilterMessageType { + // deliver message with message type. + repeated messages.MessageType message_types = 1; +} + // StreamingCode is the error code for log internal component. enum StreamingCode { STREAMING_CODE_OK = 0; diff --git a/pkg/streaming/util/message/adaptor/handler.go b/pkg/streaming/util/message/adaptor/handler.go new file mode 100644 index 0000000000000..efcd93569b03a --- /dev/null +++ b/pkg/streaming/util/message/adaptor/handler.go @@ -0,0 +1,92 @@ +package adaptor + +import ( + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// NewMsgPackAdaptorHandler create a new message pack adaptor handler. +func NewMsgPackAdaptorHandler() *MsgPackAdaptorHandler { + return &MsgPackAdaptorHandler{ + base: NewBaseMsgPackAdaptorHandler(), + } +} + +// MsgPackAdaptorHandler is the handler for message pack. +type MsgPackAdaptorHandler struct { + base *BaseMsgPackAdaptorHandler +} + +// Chan is the channel for message. +func (m *MsgPackAdaptorHandler) Chan() <-chan *msgstream.MsgPack { + return m.base.Channel +} + +// Handle is the callback for handling message. +func (m *MsgPackAdaptorHandler) Handle(msg message.ImmutableMessage) { + m.base.GenerateMsgPack(msg) + for m.base.PendingMsgPack.Len() > 0 { + m.base.Channel <- m.base.PendingMsgPack.Next() + m.base.PendingMsgPack.UnsafeAdvance() + } +} + +// Close is the callback for closing message. +func (m *MsgPackAdaptorHandler) Close() { + close(m.base.Channel) +} + +// NewBaseMsgPackAdaptorHandler create a new base message pack adaptor handler. +func NewBaseMsgPackAdaptorHandler() *BaseMsgPackAdaptorHandler { + return &BaseMsgPackAdaptorHandler{ + Logger: log.With(), + Channel: make(chan *msgstream.MsgPack), + Pendings: make([]message.ImmutableMessage, 0), + PendingMsgPack: typeutil.NewMultipartQueue[*msgstream.MsgPack](), + } +} + +// BaseMsgPackAdaptorHandler is the handler for message pack. +type BaseMsgPackAdaptorHandler struct { + Logger *log.MLogger + Channel chan *msgstream.MsgPack + Pendings []message.ImmutableMessage // pendings hold the vOld message which has same time tick. + PendingMsgPack *typeutil.MultipartQueue[*msgstream.MsgPack] // pendingMsgPack hold unsent msgPack. +} + +// GenerateMsgPack generate msgPack from message. +func (m *BaseMsgPackAdaptorHandler) GenerateMsgPack(msg message.ImmutableMessage) { + switch msg.Version() { + case message.VersionOld: + if len(m.Pendings) != 0 { + if msg.TimeTick() > m.Pendings[0].TimeTick() { + m.addMsgPackIntoPending(m.Pendings...) + m.Pendings = nil + } + } + m.Pendings = append(m.Pendings, msg) + case message.VersionV1: + if len(m.Pendings) != 0 { // all previous message should be vOld. + m.addMsgPackIntoPending(m.Pendings...) + m.Pendings = nil + } + m.addMsgPackIntoPending(msg) + default: + panic("unsupported message version") + } +} + +// addMsgPackIntoPending add message into pending msgPack. +func (m *BaseMsgPackAdaptorHandler) addMsgPackIntoPending(msgs ...message.ImmutableMessage) { + newPack, err := NewMsgPackFromMessage(msgs...) + if err != nil { + m.Logger.Warn("failed to convert message to msgpack", zap.Error(err)) + } + if newPack != nil { + m.PendingMsgPack.AddOne(newPack) + } +} diff --git a/pkg/streaming/util/message/adaptor/message_id.go b/pkg/streaming/util/message/adaptor/message_id.go index 538e0ae95e4bf..b9bc6dc333375 100644 --- a/pkg/streaming/util/message/adaptor/message_id.go +++ b/pkg/streaming/util/message/adaptor/message_id.go @@ -52,3 +52,21 @@ func DeserializeToMQWrapperID(msgID []byte, walName string) (common.MessageID, e return nil, fmt.Errorf("unsupported mq type %s", walName) } } + +func MustGetMessageIDFromMQWrapperIDBytes(walName string, msgIDBytes []byte) message.MessageID { + var commonMsgID common.MessageID + switch walName { + case "rocksmq": + id := server.DeserializeRmqID(msgIDBytes) + commonMsgID = &server.RmqID{MessageID: id} + case "pulsar": + msgID, err := mqpulsar.DeserializePulsarMsgID(msgIDBytes) + if err != nil { + panic(err) + } + commonMsgID = mqpulsar.NewPulsarID(msgID) + default: + panic("unsupported now") + } + return MustGetMessageIDFromMQWrapperID(commonMsgID) +} diff --git a/pkg/streaming/util/message/message.go b/pkg/streaming/util/message/message.go index 53ab668efaf18..905ffcd4be7e4 100644 --- a/pkg/streaming/util/message/message.go +++ b/pkg/streaming/util/message/message.go @@ -30,7 +30,7 @@ type BasicMessage interface { // VChannel returns the virtual channel of current message. // Available only when the message's version greater than 0. - // Otherwise, it will panic. + // Return "" if message is broadcasted. VChannel() string // TimeTick returns the time tick of current message. @@ -48,6 +48,9 @@ type MutableMessage interface { // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithLastConfirmed(id MessageID) MutableMessage + // WithLastConfirmedUseMessageID sets the last confirmed message id of current message to be the same as message id. + WithLastConfirmedUseMessageID() MutableMessage + // WithTimeTick sets the time tick of current message. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithTimeTick(tt uint64) MutableMessage diff --git a/pkg/streaming/util/message/message_impl.go b/pkg/streaming/util/message/message_impl.go index b10d4ddac2781..be237e680a3c0 100644 --- a/pkg/streaming/util/message/message_impl.go +++ b/pkg/streaming/util/message/message_impl.go @@ -70,6 +70,12 @@ func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage { return m } +// WithLastConfirmedUseMessageID sets the last confirmed message id of current message to be the same as message id. +func (m *messageImpl) WithLastConfirmedUseMessageID() MutableMessage { + m.properties.Set(messageLastConfirmed, messageLastConfirmedValueUseMessageID) + return m +} + // IntoImmutableMessage converts current message to immutable message. func (m *messageImpl) IntoImmutableMessage(id MessageID) ImmutableMessage { return &immutableMessageImpl{ @@ -92,10 +98,11 @@ func (m *messageImpl) TimeTick() uint64 { } // VChannel returns the vchannel of current message. +// If the message is broadcasted, the vchannel will be empty. func (m *messageImpl) VChannel() string { value, ok := m.properties.Get(messageVChannel) if !ok { - panic("there's a bug in the message codes, vchannel lost in properties of message") + return "" } return value } @@ -120,6 +127,9 @@ func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID { if !ok { panic(fmt.Sprintf("there's a bug in the message codes, last confirmed message lost in properties of message, id: %+v", m.id)) } + if value == messageLastConfirmedValueUseMessageID { + return m.MessageID() + } id, err := UnmarshalMessageID(m.id.WALName(), value) if err != nil { panic(fmt.Sprintf("there's a bug in the message codes, dirty last confirmed message in properties of message, id: %+v", m.id)) diff --git a/pkg/streaming/util/message/message_test.go b/pkg/streaming/util/message/message_test.go index 04be6f49134d6..5276d8f83061d 100644 --- a/pkg/streaming/util/message/message_test.go +++ b/pkg/streaming/util/message/message_test.go @@ -30,4 +30,7 @@ func TestVersion(t *testing.T) { }) v = newMessageVersionFromString("1") assert.Equal(t, VersionV1, v) + + assert.True(t, VersionV1.GT(VersionOld)) + assert.True(t, VersionV2.GT(VersionV1)) } diff --git a/pkg/streaming/util/message/properties.go b/pkg/streaming/util/message/properties.go index 551b417ea994e..10f02b81654e9 100644 --- a/pkg/streaming/util/message/properties.go +++ b/pkg/streaming/util/message/properties.go @@ -10,6 +10,11 @@ const ( messageSpecialiedHeader = "_sh" // specialized message header. ) +const ( + messageLastConfirmedValueUseMessageID = "use_message_id" // message last confirmed message id is same with message id. + // some message type can not set last confirmed message id, but can use the message id as last confirmed id. +) + var ( _ RProperties = propertiesImpl{} _ Properties = propertiesImpl{} diff --git a/pkg/streaming/util/options/deliver.go b/pkg/streaming/util/options/deliver.go index 31efc3a253a3e..4e02aff467f32 100644 --- a/pkg/streaming/util/options/deliver.go +++ b/pkg/streaming/util/options/deliver.go @@ -15,6 +15,7 @@ const ( DeliverFilterTypeTimeTickGT deliverFilterType = 1 DeliverFilterTypeTimeTickGTE deliverFilterType = 2 DeliverFilterTypeVChannel deliverFilterType = 3 + DeliverFilterTypeMessageType deliverFilterType = 4 ) type ( @@ -99,6 +100,21 @@ func DeliverFilterVChannel(vchannel string) DeliverFilter { } } +// DeliverFilterMessageType delivers messages filtered by message type. +func DeliverFilterMessageType(messageType ...message.MessageType) DeliverFilter { + messageTypes := make([]messagespb.MessageType, 0, len(messageType)) + for _, mt := range messageType { + messageTypes = append(messageTypes, messagespb.MessageType(mt)) + } + return &streamingpb.DeliverFilter{ + Filter: &streamingpb.DeliverFilter_MessageType{ + MessageType: &streamingpb.DeliverFilterMessageType{ + MessageTypes: messageTypes, + }, + }, + } +} + // IsDeliverFilterTimeTick checks if the filter is time tick filter. func IsDeliverFilterTimeTick(filter DeliverFilter) bool { switch filter.GetFilter().(type) { @@ -127,6 +143,15 @@ func GetFilterFunc(filters []DeliverFilter) (func(message.ImmutableMessage) bool filterFuncs = append(filterFuncs, func(im message.ImmutableMessage) bool { return im.VChannel() == filter.GetVchannel().Vchannel }) + case *streamingpb.DeliverFilter_MessageType: + filterFuncs = append(filterFuncs, func(im message.ImmutableMessage) bool { + for _, mt := range filter.GetMessageType().MessageTypes { + if im.MessageType() == message.MessageType(mt) { + return true + } + } + return false + }) default: panic("unimplemented") } diff --git a/pkg/streaming/util/types/streaming_node.go b/pkg/streaming/util/types/streaming_node.go index ae0e99e6c9e46..125891aee36bd 100644 --- a/pkg/streaming/util/types/streaming_node.go +++ b/pkg/streaming/util/types/streaming_node.go @@ -6,6 +6,7 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -84,3 +85,12 @@ func (n *StreamingNodeStatus) ErrorOfNode() error { } return n.Err } + +// AppendResult is the result of append operation. +type AppendResult struct { + // Message is generated by underlying walimpls. + MessageID message.MessageID + // TimeTick is the time tick of the message. + // Set by timetick interceptor. + TimeTick uint64 +}