Skip to content

Commit

Permalink
unmashall ts msg in dispatcher instead in msgstream
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd committed Dec 24, 2024
1 parent fb0e689 commit b5a94cc
Show file tree
Hide file tree
Showing 25 changed files with 681 additions and 224 deletions.
9 changes: 5 additions & 4 deletions internal/flushcommon/pipeline/data_sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ type DataSyncServiceSuite struct {
channelCheckpointUpdater *util2.ChannelCheckpointUpdater
factory *dependency.MockFactory
ms *msgstream.MockMsgStream
msChan chan *msgstream.MsgPack
msChan chan *msgstream.ConsumeMsgPack
}

func (s *DataSyncServiceSuite) SetupSuite() {
Expand All @@ -330,14 +330,15 @@ func (s *DataSyncServiceSuite) SetupTest() {
s.channelCheckpointUpdater = util2.NewChannelCheckpointUpdater(s.broker)

go s.channelCheckpointUpdater.Start()
s.msChan = make(chan *msgstream.MsgPack, 1)
s.msChan = make(chan *msgstream.ConsumeMsgPack, 1)

s.factory = dependency.NewMockFactory(s.T())
s.ms = msgstream.NewMockMsgStream(s.T())
s.factory.EXPECT().NewTtMsgStream(mock.Anything).Return(s.ms, nil)
s.ms.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.ms.EXPECT().Chan().Return(s.msChan)
s.ms.EXPECT().Close().Return()
s.ms.EXPECT().GetUnmarshalDispatcher().Return(nil)

s.pipelineParams = &util2.PipelineParams{
Ctx: context.TODO(),
Expand Down Expand Up @@ -487,8 +488,8 @@ func (s *DataSyncServiceSuite) TestStartStop() {
close(ch)
return nil
})
s.msChan <- &msgPack
s.msChan <- &timeTickMsgPack
s.msChan <- msgstream.BuildConsumeMsgPack(&msgPack)
s.msChan <- msgstream.BuildConsumeMsgPack(&msgPack)
<-ch
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func (mtm *mockTtMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {

func (mtm *mockTtMsgStream) Close() {}

func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
return make(chan *msgstream.MsgPack, 100)
func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack {
return make(chan *msgstream.ConsumeMsgPack, 100)
}

func (mtm *mockTtMsgStream) AsProducer(ctx context.Context, channels []string) {}
Expand All @@ -77,6 +77,10 @@ func (mtm *mockTtMsgStream) AsConsumer(ctx context.Context, channels []string, s
return nil
}

func (mtm *mockTtMsgStream) GetUnmarshalDispatcher() msgstream.UnmarshalDispatcher {
return nil
}

func (mtm *mockTtMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}

func (mtm *mockTtMsgStream) GetProduceChannels() []string {
Expand Down
13 changes: 8 additions & 5 deletions internal/proxy/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func newDefaultMockDqlTask() *mockDqlTask {
}

type simpleMockMsgStream struct {
msgChan chan *msgstream.MsgPack
msgChan chan *msgstream.ConsumeMsgPack

msgCount int
msgCountMtx sync.RWMutex
Expand All @@ -244,7 +244,7 @@ type simpleMockMsgStream struct {
func (ms *simpleMockMsgStream) Close() {
}

func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.MsgPack {
func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack {
if ms.getMsgCount() <= 0 {
ms.msgChan <- nil
return ms.msgChan
Expand All @@ -255,6 +255,10 @@ func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.MsgPack {
return ms.msgChan
}

func (ms *simpleMockMsgStream) GetUnmarshalDispatcher() msgstream.UnmarshalDispatcher {
return nil
}

func (ms *simpleMockMsgStream) AsProducer(ctx context.Context, channels []string) {
}

Expand Down Expand Up @@ -286,8 +290,7 @@ func (ms *simpleMockMsgStream) decreaseMsgCount(delta int) {
func (ms *simpleMockMsgStream) Produce(ctx context.Context, pack *msgstream.MsgPack) error {
defer ms.increaseMsgCount(1)

ms.msgChan <- pack

ms.msgChan <- msgstream.BuildConsumeMsgPack(pack)
return nil
}

Expand Down Expand Up @@ -319,7 +322,7 @@ func (ms *simpleMockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {

func newSimpleMockMsgStream() *simpleMockMsgStream {
return &simpleMockMsgStream{
msgChan: make(chan *msgstream.MsgPack, 1024),
msgChan: make(chan *msgstream.ConsumeMsgPack, 1024),
msgCount: 0,
}
}
Expand Down
10 changes: 9 additions & 1 deletion internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,15 @@ func (sd *shardDelegator) createStreamFromMsgStream(ctx context.Context, positio
if err != nil {
return nil, stream.Close, err
}
return stream.Chan(), stream.Close, nil

dispatcher := msgstream.NewSimpleMsgDispatcher(stream, func(pm msgstream.PackMsg) bool {
if pm.GetType() != commonpb.MsgType_Delete || pm.GetChannel() != vchannelName {
return false
}
return true
})

return dispatcher.Chan(), dispatcher.Close, nil
}

func (sd *shardDelegator) createDeleteStreamFromStreamingService(ctx context.Context, position *msgpb.MsgPosition) (ch <-chan *msgstream.MsgPack, closer func(), err error) {
Expand Down
9 changes: 6 additions & 3 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func (s *DelegatorDataSuite) SetupTest() {
// init schema
s.genNormalCollection()
s.mq = &msgstream.MockMsgStream{}
s.mq.EXPECT().GetUnmarshalDispatcher().Return(nil)

s.rootPath = s.Suite.T().Name()
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
Expand Down Expand Up @@ -916,8 +918,9 @@ func (s *DelegatorDataSuite) TestLoadSegments() {

s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Seek(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().GetUnmarshalDispatcher().Return(nil)
s.mq.EXPECT().Close()
ch := make(chan *msgstream.MsgPack, 10)
ch := make(chan *msgstream.ConsumeMsgPack, 10)
close(ch)

s.mq.EXPECT().Chan().Return(ch)
Expand Down Expand Up @@ -1584,7 +1587,7 @@ func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Seek(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Close()
ch := make(chan *msgstream.MsgPack, 10)
ch := make(chan *msgstream.ConsumeMsgPack, 10)
s.mq.EXPECT().Chan().Return(ch)

oracle := pkoracle.NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
Expand All @@ -1602,7 +1605,7 @@ func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
}

for _, data := range datas {
ch <- data
ch <- msgstream.BuildConsumeMsgPack(data)
}

result, err := s.delegator.readDeleteFromMsgstream(ctx, &msgpb.MsgPosition{Timestamp: 0}, 10, oracle)
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import (
type ServiceSuite struct {
suite.Suite
// Data
msgChan chan *msgstream.MsgPack
msgChan chan *msgstream.ConsumeMsgPack
collectionID int64
collectionName string
schema *schemapb.CollectionSchema
Expand Down
12 changes: 9 additions & 3 deletions internal/rootcoord/alter_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
Expand Down Expand Up @@ -250,7 +251,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
return nil
}
packChan := make(chan *msgstream.MsgPack, 10)
packChan := make(chan *msgstream.ConsumeMsgPack, 10)
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")

Expand All @@ -268,13 +269,18 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
},
}

unmarshalFactory := &msgstream.ProtoUDFactory{}
unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher()

err := task.Execute(context.Background())
assert.NoError(t, err)
time.Sleep(time.Second)
select {
case pack := <-packChan:
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].Type())
replicateMsg := pack.Msgs[0].(*msgstream.ReplicateMsg)
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType())
tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher)
require.NoError(t, err)
replicateMsg := tsMsg.(*msgstream.ReplicateMsg)
assert.Equal(t, "foo", replicateMsg.ReplicateMsg.GetDatabase())
assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetCollection())
assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd())
Expand Down
13 changes: 10 additions & 3 deletions internal/rootcoord/alter_database_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/model"
Expand Down Expand Up @@ -236,7 +237,7 @@ func Test_alterDatabaseTask_Execute(t *testing.T) {
mock.Anything,
).Return(nil)
// the chan length should larger than 4, because newChanTimeTickSync will send 4 ts messages when execute the `broadcast` step
packChan := make(chan *msgstream.MsgPack, 10)
packChan := make(chan *msgstream.ConsumeMsgPack, 10)
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")

Expand All @@ -252,13 +253,19 @@ func Test_alterDatabaseTask_Execute(t *testing.T) {
},
}

unmarshalFactory := &msgstream.ProtoUDFactory{}
unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher()

err := task.Execute(context.Background())
assert.NoError(t, err)
time.Sleep(time.Second)
select {
case pack := <-packChan:
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].Type())
replicateMsg := pack.Msgs[0].(*msgstream.ReplicateMsg)
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType())

tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher)
require.NoError(t, err)
replicateMsg := tsMsg.(*msgstream.ReplicateMsg)
assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetDatabase())
assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd())
default:
Expand Down
9 changes: 5 additions & 4 deletions internal/rootcoord/dml_channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,11 @@ type FailMsgStream struct {
errBroadcast bool
}

func (ms *FailMsgStream) Close() {}
func (ms *FailMsgStream) Chan() <-chan *msgstream.MsgPack { return nil }
func (ms *FailMsgStream) AsProducer(ctx context.Context, channels []string) {}
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
func (ms *FailMsgStream) Close() {}
func (ms *FailMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack { return nil }
func (ms *FailMsgStream) GetUnmarshalDispatcher() msgstream.UnmarshalDispatcher { return nil }
func (ms *FailMsgStream) AsProducer(ctx context.Context, channels []string) {}
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
func (ms *FailMsgStream) AsConsumer(ctx context.Context, channels []string, subName string, position common.SubscriptionInitialPosition) error {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/rootcoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,23 +1059,23 @@ func newTickerWithFactory(factory msgstream.Factory) *timetickSync {
return ticker
}

func newChanTimeTickSync(packChan chan *msgstream.MsgPack) *timetickSync {
func newChanTimeTickSync(packChan chan *msgstream.ConsumeMsgPack) *timetickSync {
f := msgstream.NewMockMqFactory()
f.NewMsgStreamFunc = func(ctx context.Context) (msgstream.MsgStream, error) {
stream := msgstream.NewWastedMockMsgStream()
stream.BroadcastFunc = func(pack *msgstream.MsgPack) error {
log.Info("mock Broadcast")
packChan <- pack
packChan <- msgstream.BuildConsumeMsgPack(pack)
return nil
}
stream.BroadcastMarkFunc = func(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
log.Info("mock BroadcastMark")
packChan <- pack
packChan <- msgstream.BuildConsumeMsgPack(pack)
return map[string][]msgstream.MessageID{}, nil
}
stream.AsProducerFunc = func(channels []string) {
}
stream.ChanFunc = func() <-chan *msgstream.MsgPack {
stream.ChanFunc = func() <-chan *msgstream.ConsumeMsgPack {
return packChan
}
return stream, nil
Expand Down
7 changes: 5 additions & 2 deletions internal/util/flowgraph/input_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -45,8 +46,9 @@ func TestInputNode(t *testing.T) {
produceStream.AsProducer(context.TODO(), channels)
produceStream.Produce(context.TODO(), &msgPack)

dispatcher := msgstream.NewSimpleMsgDispatcher(msgStream, func(pm msgstream.PackMsg) bool { return true })
nodeName := "input_node"
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, "", 0, 0, "")
inputNode := NewInputNode(dispatcher.Chan(), nodeName, 100, 100, "", 0, 0, "")
defer inputNode.Close()

isInputNode := inputNode.IsInputNode()
Expand Down Expand Up @@ -89,7 +91,8 @@ func Test_InputNodeSkipMode(t *testing.T) {
outputCh := make(chan bool)

nodeName := "input_node"
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, typeutil.DataNodeRole, 0, 0, "")
dispatcher := msgstream.NewSimpleMsgDispatcher(msgStream, func(pm msgstream.PackMsg) bool { return true })
inputNode := NewInputNode(dispatcher.Chan(), nodeName, 100, 100, typeutil.DataNodeRole, 0, 0, "")
defer inputNode.Close()

outputCount := 0
Expand Down
3 changes: 2 additions & 1 deletion internal/util/flowgraph/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func TestNodeManager_Start(t *testing.T) {
produceStream.Produce(context.TODO(), &msgPack)

nodeName := "input_node"
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, "", 0, 0, "")
dispatcher := msgstream.NewSimpleMsgDispatcher(msgStream, func(pm msgstream.PackMsg) bool { return true })
inputNode := NewInputNode(dispatcher.Chan(), nodeName, 100, 100, "", 0, 0, "")

ddNode := BaseNode{}

Expand Down
7 changes: 6 additions & 1 deletion pkg/mq/common/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ const (
SubscriptionPositionUnknown
)

const MsgTypeKey = "msg_type"
const (
MsgTypeKey = "msg_type"
TimestampTypeKey = "timestamp"
ChannelTypeKey = "vchannel"
ReplicateIDTypeKey = "replicate_id"
)

func GetMsgType(msg Message) (commonpb.MsgType, error) {
msgType := commonpb.MsgType_Undefined
Expand Down
Loading

0 comments on commit b5a94cc

Please sign in to comment.