diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index 8ef6df73619d0..810b15065d6a3 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -78,6 +78,8 @@ type Scanner interface { // WALAccesser is the interfaces to interact with the milvus write ahead log. type WALAccesser interface { + WALName() string + // Txn returns a transaction for writing records to the log. // Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal. Txn(ctx context.Context, opts TxnOption) (Txn, error) diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index 8caba0186bc05..b4d7fb5f90080 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/client" "github.com/milvus-io/milvus/internal/streamingnode/client/handler" "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/conc" @@ -54,6 +55,10 @@ type walAccesserImpl struct { dispatchExecutionPool *conc.Pool[struct{}] } +func (w *walAccesserImpl) WALName() string { + return util.MustSelectWALName() +} + // RawAppend writes a record to the log. func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) { assertValidMessage(msg) diff --git a/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go index 6b391ff89a062..e077e04030f15 100644 --- a/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go +++ b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go @@ -331,6 +331,51 @@ func (_c *MockWALAccesser_Txn_Call) RunAndReturn(run func(context.Context, strea return _c } +// WALName provides a mock function with given fields: +func (_m *MockWALAccesser) WALName() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for WALName") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockWALAccesser_WALName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WALName' +type MockWALAccesser_WALName_Call struct { + *mock.Call +} + +// WALName is a helper method to define mock.On call +func (_e *MockWALAccesser_Expecter) WALName() *MockWALAccesser_WALName_Call { + return &MockWALAccesser_WALName_Call{Call: _e.mock.On("WALName")} +} + +func (_c *MockWALAccesser_WALName_Call) Run(run func()) *MockWALAccesser_WALName_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWALAccesser_WALName_Call) Return(_a0 string) *MockWALAccesser_WALName_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWALAccesser_WALName_Call) RunAndReturn(run func() string) *MockWALAccesser_WALName_Call { + _c.Call.Return(run) + return _c +} + // NewMockWALAccesser creates a new instance of MockWALAccesser. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockWALAccesser(t interface { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index a6ffc4f1b0d86..b21d7784ac616 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -735,7 +735,7 @@ func (sd *shardDelegator) createDeleteStreamFromStreamingService(ctx context.Con s := streaming.WAL().Read(ctx, streaming.ReadOption{ VChannel: position.GetChannelName(), DeliverPolicy: options.DeliverPolicyStartFrom( - adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID()), + adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID()), ), DeliverFilters: []options.DeliverFilter{ // only deliver message which timestamp >= position.Timestamp diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index 204b6cfb0019c..888f1a9da009a 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -95,7 +95,7 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M } if streamingutil.IsStreamingServiceEnabled() { - startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID()) + startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID()) log.Info( "stream pipeline seeks from position with scanner", zap.String("channel", position.GetChannelName()), diff --git a/internal/util/streamingutil/util/wal_selector.go b/internal/util/streamingutil/util/wal_selector.go index cbb24db457485..ef6c7abfba914 100644 --- a/internal/util/streamingutil/util/wal_selector.go +++ b/internal/util/streamingutil/util/wal_selector.go @@ -2,14 +2,13 @@ package util import ( "github.com/cockroachdb/errors" - "go.uber.org/atomic" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( walTypeDefault = "default" - walTypeNatsmq = "natsmq" walTypeRocksmq = "rocksmq" walTypeKafka = "kafka" walTypePulsar = "pulsar" @@ -17,25 +16,16 @@ const ( type walEnable struct { Rocksmq bool - Natsmq bool Pulsar bool Kafka bool } -var isStandAlone = atomic.NewBool(false) - -// EnableStandAlone enable standalone mode. -func EnableStandAlone(standalone bool) { - isStandAlone.Store(standalone) -} - // MustSelectWALName select wal name. func MustSelectWALName() string { - standalone := isStandAlone.Load() params := paramtable.Get() + standalone := params.RuntimeConfig.Role.GetAsString() == typeutil.StandaloneRole return mustSelectWALName(standalone, params.MQCfg.Type.GetValue(), walEnable{ params.RocksmqEnable(), - params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable(), }) @@ -68,8 +58,8 @@ func validateWALName(standalone bool, mqType string) error { // we may register more mq type by plugin. // so we should not check all mq type here. // only check standalone type. - if !standalone && (mqType == walTypeRocksmq || mqType == walTypeNatsmq) { - return errors.Newf("mq %s is only valid in standalone mode") + if !standalone && mqType == walTypeRocksmq { + return errors.Newf("mq %s is only valid in standalone mode", mqType) } return nil } diff --git a/internal/util/streamingutil/util/wal_selector_test.go b/internal/util/streamingutil/util/wal_selector_test.go index 6343eaf1b3718..a3cc1804254e0 100644 --- a/internal/util/streamingutil/util/wal_selector_test.go +++ b/internal/util/streamingutil/util/wal_selector_test.go @@ -7,27 +7,24 @@ import ( ) func TestValidateWALType(t *testing.T) { - assert.Error(t, validateWALName(false, walTypeNatsmq)) assert.Error(t, validateWALName(false, walTypeRocksmq)) } func TestSelectWALType(t *testing.T) { - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), walTypeRocksmq) - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka) - assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, false}) }) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka) - assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, false}) }) - assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true, true}), walTypeRocksmq) - assert.Equal(t, mustSelectWALName(true, walTypeNatsmq, walEnable{true, true, true, true}), walTypeNatsmq) - assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka) - assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true, true}) }) - assert.Panics(t, func() { mustSelectWALName(false, walTypeNatsmq, walEnable{true, true, true, true}) }) - assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar) - assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true}), walTypeRocksmq) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true}), walTypeKafka) + assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false}) }) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true}), walTypeKafka) + assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false}) }) + assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true}), walTypeRocksmq) + assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true}), walTypeKafka) + assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true}) }) + assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true}), walTypeKafka) } diff --git a/pkg/mq/mqimpl/rocksmq/client/client_impl.go b/pkg/mq/mqimpl/rocksmq/client/client_impl.go index d550468fedf67..f68e6c602f2e1 100644 --- a/pkg/mq/mqimpl/rocksmq/client/client_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/client_impl.go @@ -15,6 +15,7 @@ import ( "context" "reflect" "sync" + "time" "github.com/cockroachdb/errors" "go.uber.org/zap" @@ -139,6 +140,7 @@ func (c *client) consume(consumer *consumer) { var consumerCh chan<- common.Message var waitForSent *RmqMessage var newIncomingMsgCh <-chan struct{} + var timerNotify <-chan time.Time if len(pendingMsgs) > 0 { // If there's pending sent messages, we can try to deliver them first. consumerCh = consumer.messageCh @@ -148,6 +150,9 @@ func (c *client) consume(consumer *consumer) { // !!! TODO: MsgMutex may lost, not sync up with the consumer, // so the tailing message cannot be consumed if no new producing message. newIncomingMsgCh = consumer.MsgMutex() + // It's a bad implementation here, for quickly fixing the previous problem. + // Every 100ms, wake up and check if the consumer has new incoming data. + timerNotify = time.After(100 * time.Millisecond) } select { @@ -162,6 +167,8 @@ func (c *client) consume(consumer *consumer) { log.Info("Consumer MsgMutex closed") return } + case <-timerNotify: + continue } } } @@ -191,6 +198,17 @@ func (c *client) tryToConsume(consumer *consumer) []*RmqMessage { } rmqMsgs := make([]*RmqMessage, 0, len(msgs)) for _, msg := range msgs { + rmqMsg, err := unmarshalStreamingMessage(consumer.topic, msg) + if err == nil { + rmqMsgs = append(rmqMsgs, rmqMsg) + continue + } + if !errors.Is(err, errNotStreamingServiceMessage) { + log.Warn("Consumer's goroutine cannot unmarshal streaming message: ", zap.Error(err)) + continue + } + // then fallback to the legacy message format. + // This is the hack, we put property into pl properties := make(map[string]string, 0) pl, err := UnmarshalHeader(msg.Payload) diff --git a/pkg/mq/mqimpl/rocksmq/client/producer.go b/pkg/mq/mqimpl/rocksmq/client/producer.go index 65fc19a8dc4b4..50388478a44d5 100644 --- a/pkg/mq/mqimpl/rocksmq/client/producer.go +++ b/pkg/mq/mqimpl/rocksmq/client/producer.go @@ -28,6 +28,9 @@ type Producer interface { // publish a message Send(message *common.ProducerMessage) (UniqueID, error) + // publish a message for new streaming service. + SendForStreamingService(message *common.ProducerMessage) (UniqueID, error) + // Close a producer Close() } diff --git a/pkg/mq/mqimpl/rocksmq/client/producer_impl.go b/pkg/mq/mqimpl/rocksmq/client/producer_impl.go index f858bce63cc30..4f2aad064c293 100644 --- a/pkg/mq/mqimpl/rocksmq/client/producer_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/producer_impl.go @@ -76,6 +76,20 @@ func (p *producer) Send(message *common.ProducerMessage) (UniqueID, error) { return ids[0], nil } +func (p *producer) SendForStreamingService(message *common.ProducerMessage) (UniqueID, error) { + payload, err := marshalStreamingMessage(message) + if err != nil { + return 0, err + } + ids, err := p.c.server.Produce(p.topic, []server.ProducerMessage{{ + Payload: payload, + }}) + if err != nil { + return 0, err + } + return ids[0], nil +} + // Close destroy the topic of this producer in rocksmq func (p *producer) Close() { err := p.c.server.DestroyTopic(p.topic) diff --git a/pkg/mq/mqimpl/rocksmq/client/streaming.go b/pkg/mq/mqimpl/rocksmq/client/streaming.go new file mode 100644 index 0000000000000..c317bb26d9ed9 --- /dev/null +++ b/pkg/mq/mqimpl/rocksmq/client/streaming.go @@ -0,0 +1,53 @@ +package client + +import ( + "bytes" + + "github.com/cockroachdb/errors" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus/pkg/mq/common" + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" + "github.com/milvus-io/milvus/pkg/streaming/proto/messagespb" +) + +var ( + // magicPrefix is used to identify the rocksmq legacy message and new message for streaming service. + // Make a low probability of collision with the legacy proto message. + magicPrefix = append([]byte{0xFF, 0xFE, 0xFD, 0xFC}, []byte("STREAM")...) + errNotStreamingServiceMessage = errors.New("not a streaming service message") +) + +// marshalStreamingMessage marshals a streaming message to bytes. +func marshalStreamingMessage(message *common.ProducerMessage) ([]byte, error) { + rmqMessage := &messagespb.RMQMessageLayout{ + Payload: message.Payload, + Properties: message.Properties, + } + payload, err := proto.Marshal(rmqMessage) + if err != nil { + return nil, err + } + finalPayload := make([]byte, len(payload)+len(magicPrefix)) + copy(finalPayload, magicPrefix) + copy(finalPayload[len(magicPrefix):], payload) + return finalPayload, nil +} + +// unmarshalStreamingMessage unmarshals a streaming message from bytes. +func unmarshalStreamingMessage(topic string, msg server.ConsumerMessage) (*RmqMessage, error) { + if !bytes.HasPrefix(msg.Payload, magicPrefix) { + return nil, errNotStreamingServiceMessage + } + + var rmqMessage messagespb.RMQMessageLayout + if err := proto.Unmarshal(msg.Payload[len(magicPrefix):], &rmqMessage); err != nil { + return nil, err + } + return &RmqMessage{ + msgID: msg.MsgID, + payload: rmqMessage.Payload, + properties: rmqMessage.Properties, + topic: topic, + }, nil +} diff --git a/pkg/mq/mqimpl/rocksmq/client/streaming_test.go b/pkg/mq/mqimpl/rocksmq/client/streaming_test.go new file mode 100644 index 0000000000000..3028c98fb8c13 --- /dev/null +++ b/pkg/mq/mqimpl/rocksmq/client/streaming_test.go @@ -0,0 +1,36 @@ +package client + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/mq/common" + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" +) + +func TestStreaming(t *testing.T) { + payload, err := marshalStreamingMessage(&common.ProducerMessage{ + Payload: []byte("payload"), + Properties: map[string]string{ + "key": "value", + }, + }) + assert.NoError(t, err) + assert.NotNil(t, payload) + + msg, err := unmarshalStreamingMessage("topic", server.ConsumerMessage{ + MsgID: 1, + Payload: payload, + }) + assert.NoError(t, err) + assert.Equal(t, string(msg.Payload()), "payload") + assert.Equal(t, msg.Properties()["key"], "value") + msg, err = unmarshalStreamingMessage("topic", server.ConsumerMessage{ + MsgID: 1, + Payload: payload[1:], + }) + assert.Error(t, err) + assert.ErrorIs(t, err, errNotStreamingServiceMessage) + assert.Nil(t, msg) +} diff --git a/pkg/streaming/proto/messages.proto b/pkg/streaming/proto/messages.proto index 62b84f98a2c80..091e59042729c 100644 --- a/pkg/streaming/proto/messages.proto +++ b/pkg/streaming/proto/messages.proto @@ -242,3 +242,9 @@ enum TxnState { // the transaction is rollbacked. TxnRollbacked = 6; } + +// RMQMessageLayout is the layout of message for RMQ. +message RMQMessageLayout { + bytes payload = 1; // message body + map properties = 2; // message properties +} diff --git a/pkg/streaming/walimpls/impls/rmq/rmq_test.go b/pkg/streaming/walimpls/impls/rmq/rmq_test.go index a8fc81d209c78..47e9f57043fc2 100644 --- a/pkg/streaming/walimpls/impls/rmq/rmq_test.go +++ b/pkg/streaming/walimpls/impls/rmq/rmq_test.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -35,5 +36,5 @@ func TestRegistry(t *testing.T) { } func TestWAL(t *testing.T) { - // walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() + walimpls.NewWALImplsTestFramework(t, 1000, &builderImpl{}).Run() } diff --git a/pkg/streaming/walimpls/impls/rmq/wal.go b/pkg/streaming/walimpls/impls/rmq/wal.go index c2cf37eeaee27..6e22d23859249 100644 --- a/pkg/streaming/walimpls/impls/rmq/wal.go +++ b/pkg/streaming/walimpls/impls/rmq/wal.go @@ -30,7 +30,7 @@ func (w *walImpl) WALName() string { // Append appends a message to the wal. func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { - id, err := w.p.Send(&common.ProducerMessage{ + id, err := w.p.SendForStreamingService(&common.ProducerMessage{ Payload: msg.Payload(), Properties: msg.Properties().ToRawMap(), })