Skip to content

Commit

Permalink
enhance: enable rmq for streaming (milvus-io#38669)
Browse files Browse the repository at this point in the history
issue: milvus-io#38399

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored and NicoYuan1986 committed Dec 26, 2024
1 parent 9cbfd40 commit 487ddb1
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 37 deletions.
2 changes: 2 additions & 0 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type Scanner interface {

// WALAccesser is the interfaces to interact with the milvus write ahead log.
type WALAccesser interface {
WALName() string

// Txn returns a transaction for writing records to the log.
// Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal.
Txn(ctx context.Context, opts TxnOption) (Txn, error)
Expand Down
5 changes: 5 additions & 0 deletions internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/client"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/conc"
Expand Down Expand Up @@ -54,6 +55,10 @@ type walAccesserImpl struct {
dispatchExecutionPool *conc.Pool[struct{}]
}

func (w *walAccesserImpl) WALName() string {
return util.MustSelectWALName()
}

// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertValidMessage(msg)
Expand Down
45 changes: 45 additions & 0 deletions internal/mocks/distributed/mock_streaming/mock_WALAccesser.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (sd *shardDelegator) createDeleteStreamFromStreamingService(ctx context.Con
s := streaming.WAL().Read(ctx, streaming.ReadOption{
VChannel: position.GetChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(
adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID()),
adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID()),
),
DeliverFilters: []options.DeliverFilter{
// only deliver message which timestamp >= position.Timestamp
Expand Down
2 changes: 1 addition & 1 deletion internal/util/pipeline/stream_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M
}

if streamingutil.IsStreamingServiceEnabled() {
startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID())
startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID())
log.Info(
"stream pipeline seeks from position with scanner",
zap.String("channel", position.GetChannelName()),
Expand Down
18 changes: 4 additions & 14 deletions internal/util/streamingutil/util/wal_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,30 @@ package util

import (
"github.com/cockroachdb/errors"
"go.uber.org/atomic"

"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const (
walTypeDefault = "default"
walTypeNatsmq = "natsmq"
walTypeRocksmq = "rocksmq"
walTypeKafka = "kafka"
walTypePulsar = "pulsar"
)

type walEnable struct {
Rocksmq bool
Natsmq bool
Pulsar bool
Kafka bool
}

var isStandAlone = atomic.NewBool(false)

// EnableStandAlone enable standalone mode.
func EnableStandAlone(standalone bool) {
isStandAlone.Store(standalone)
}

// MustSelectWALName select wal name.
func MustSelectWALName() string {
standalone := isStandAlone.Load()
params := paramtable.Get()
standalone := params.RuntimeConfig.Role.GetAsString() == typeutil.StandaloneRole
return mustSelectWALName(standalone, params.MQCfg.Type.GetValue(), walEnable{
params.RocksmqEnable(),
params.NatsmqEnable(),
params.PulsarEnable(),
params.KafkaEnable(),
})
Expand Down Expand Up @@ -68,8 +58,8 @@ func validateWALName(standalone bool, mqType string) error {
// we may register more mq type by plugin.
// so we should not check all mq type here.
// only check standalone type.
if !standalone && (mqType == walTypeRocksmq || mqType == walTypeNatsmq) {
return errors.Newf("mq %s is only valid in standalone mode")
if !standalone && mqType == walTypeRocksmq {
return errors.Newf("mq %s is only valid in standalone mode", mqType)
}
return nil
}
35 changes: 16 additions & 19 deletions internal/util/streamingutil/util/wal_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,24 @@ import (
)

func TestValidateWALType(t *testing.T) {
assert.Error(t, validateWALName(false, walTypeNatsmq))
assert.Error(t, validateWALName(false, walTypeRocksmq))
}

func TestSelectWALType(t *testing.T) {
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, false}) })
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, false}) })
assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeNatsmq, walEnable{true, true, true, true}), walTypeNatsmq)
assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true, true}) })
assert.Panics(t, func() { mustSelectWALName(false, walTypeNatsmq, walEnable{true, true, true, true}) })
assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false}) })
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false}) })
assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true}), walTypeRocksmq)
assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true}), walTypeKafka)
assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true}) })
assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true}), walTypePulsar)
assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true}), walTypeKafka)
}
18 changes: 18 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"reflect"
"sync"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -139,6 +140,7 @@ func (c *client) consume(consumer *consumer) {
var consumerCh chan<- common.Message
var waitForSent *RmqMessage
var newIncomingMsgCh <-chan struct{}
var timerNotify <-chan time.Time
if len(pendingMsgs) > 0 {
// If there's pending sent messages, we can try to deliver them first.
consumerCh = consumer.messageCh
Expand All @@ -148,6 +150,9 @@ func (c *client) consume(consumer *consumer) {
// !!! TODO: MsgMutex may lost, not sync up with the consumer,
// so the tailing message cannot be consumed if no new producing message.
newIncomingMsgCh = consumer.MsgMutex()
// It's a bad implementation here, for quickly fixing the previous problem.
// Every 100ms, wake up and check if the consumer has new incoming data.
timerNotify = time.After(100 * time.Millisecond)
}

select {
Expand All @@ -162,6 +167,8 @@ func (c *client) consume(consumer *consumer) {
log.Info("Consumer MsgMutex closed")
return
}
case <-timerNotify:
continue
}
}
}
Expand Down Expand Up @@ -191,6 +198,17 @@ func (c *client) tryToConsume(consumer *consumer) []*RmqMessage {
}
rmqMsgs := make([]*RmqMessage, 0, len(msgs))
for _, msg := range msgs {
rmqMsg, err := unmarshalStreamingMessage(consumer.topic, msg)
if err == nil {
rmqMsgs = append(rmqMsgs, rmqMsg)
continue
}
if !errors.Is(err, errNotStreamingServiceMessage) {
log.Warn("Consumer's goroutine cannot unmarshal streaming message: ", zap.Error(err))
continue
}
// then fallback to the legacy message format.

// This is the hack, we put property into pl
properties := make(map[string]string, 0)
pl, err := UnmarshalHeader(msg.Payload)
Expand Down
3 changes: 3 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Producer interface {
// publish a message
Send(message *common.ProducerMessage) (UniqueID, error)

// publish a message for new streaming service.
SendForStreamingService(message *common.ProducerMessage) (UniqueID, error)

// Close a producer
Close()
}
14 changes: 14 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ func (p *producer) Send(message *common.ProducerMessage) (UniqueID, error) {
return ids[0], nil
}

func (p *producer) SendForStreamingService(message *common.ProducerMessage) (UniqueID, error) {
payload, err := marshalStreamingMessage(message)
if err != nil {
return 0, err
}
ids, err := p.c.server.Produce(p.topic, []server.ProducerMessage{{
Payload: payload,
}})
if err != nil {
return 0, err
}
return ids[0], nil
}

// Close destroy the topic of this producer in rocksmq
func (p *producer) Close() {
err := p.c.server.DestroyTopic(p.topic)
Expand Down
53 changes: 53 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package client

import (
"bytes"

"github.com/cockroachdb/errors"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/streaming/proto/messagespb"
)

var (
// magicPrefix is used to identify the rocksmq legacy message and new message for streaming service.
// Make a low probability of collision with the legacy proto message.
magicPrefix = append([]byte{0xFF, 0xFE, 0xFD, 0xFC}, []byte("STREAM")...)
errNotStreamingServiceMessage = errors.New("not a streaming service message")
)

// marshalStreamingMessage marshals a streaming message to bytes.
func marshalStreamingMessage(message *common.ProducerMessage) ([]byte, error) {
rmqMessage := &messagespb.RMQMessageLayout{
Payload: message.Payload,
Properties: message.Properties,
}
payload, err := proto.Marshal(rmqMessage)
if err != nil {
return nil, err
}
finalPayload := make([]byte, len(payload)+len(magicPrefix))
copy(finalPayload, magicPrefix)
copy(finalPayload[len(magicPrefix):], payload)
return finalPayload, nil
}

// unmarshalStreamingMessage unmarshals a streaming message from bytes.
func unmarshalStreamingMessage(topic string, msg server.ConsumerMessage) (*RmqMessage, error) {
if !bytes.HasPrefix(msg.Payload, magicPrefix) {
return nil, errNotStreamingServiceMessage
}

var rmqMessage messagespb.RMQMessageLayout
if err := proto.Unmarshal(msg.Payload[len(magicPrefix):], &rmqMessage); err != nil {
return nil, err
}
return &RmqMessage{
msgID: msg.MsgID,
payload: rmqMessage.Payload,
properties: rmqMessage.Properties,
topic: topic,
}, nil
}
36 changes: 36 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package client

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
)

func TestStreaming(t *testing.T) {
payload, err := marshalStreamingMessage(&common.ProducerMessage{
Payload: []byte("payload"),
Properties: map[string]string{
"key": "value",
},
})
assert.NoError(t, err)
assert.NotNil(t, payload)

msg, err := unmarshalStreamingMessage("topic", server.ConsumerMessage{
MsgID: 1,
Payload: payload,
})
assert.NoError(t, err)
assert.Equal(t, string(msg.Payload()), "payload")
assert.Equal(t, msg.Properties()["key"], "value")
msg, err = unmarshalStreamingMessage("topic", server.ConsumerMessage{
MsgID: 1,
Payload: payload[1:],
})
assert.Error(t, err)
assert.ErrorIs(t, err, errNotStreamingServiceMessage)
assert.Nil(t, msg)
}
6 changes: 6 additions & 0 deletions pkg/streaming/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,9 @@ enum TxnState {
// the transaction is rollbacked.
TxnRollbacked = 6;
}

// RMQMessageLayout is the layout of message for RMQ.
message RMQMessageLayout {
bytes payload = 1; // message body
map<string, string> properties = 2; // message properties
}
3 changes: 2 additions & 1 deletion pkg/streaming/walimpls/impls/rmq/rmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/registry"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down Expand Up @@ -35,5 +36,5 @@ func TestRegistry(t *testing.T) {
}

func TestWAL(t *testing.T) {
// walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run()
walimpls.NewWALImplsTestFramework(t, 1000, &builderImpl{}).Run()
}
Loading

0 comments on commit 487ddb1

Please sign in to comment.