Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: unmashall ts msg in dispatcher instead in msgstream #38656

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/flushcommon/pipeline/data_sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ type DataSyncServiceSuite struct {
channelCheckpointUpdater *util2.ChannelCheckpointUpdater
factory *dependency.MockFactory
ms *msgstream.MockMsgStream
msChan chan *msgstream.MsgPack
msChan chan *msgstream.ConsumeMsgPack
}

func (s *DataSyncServiceSuite) SetupSuite() {
Expand All @@ -330,14 +330,15 @@ func (s *DataSyncServiceSuite) SetupTest() {
s.channelCheckpointUpdater = util2.NewChannelCheckpointUpdater(s.broker)

go s.channelCheckpointUpdater.Start()
s.msChan = make(chan *msgstream.MsgPack, 1)
s.msChan = make(chan *msgstream.ConsumeMsgPack, 1)

s.factory = dependency.NewMockFactory(s.T())
s.ms = msgstream.NewMockMsgStream(s.T())
s.factory.EXPECT().NewTtMsgStream(mock.Anything).Return(s.ms, nil)
s.ms.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.ms.EXPECT().Chan().Return(s.msChan)
s.ms.EXPECT().Close().Return()
s.ms.EXPECT().GetUnmarshalDispatcher().Return(nil)

s.pipelineParams = &util2.PipelineParams{
Ctx: context.TODO(),
Expand Down Expand Up @@ -487,8 +488,8 @@ func (s *DataSyncServiceSuite) TestStartStop() {
close(ch)
return nil
})
s.msChan <- &msgPack
s.msChan <- &timeTickMsgPack
s.msChan <- msgstream.BuildConsumeMsgPack(&msgPack)
s.msChan <- msgstream.BuildConsumeMsgPack(&msgPack)
<-ch
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func (mtm *mockTtMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {

func (mtm *mockTtMsgStream) Close() {}

func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.MsgPack {
return make(chan *msgstream.MsgPack, 100)
func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack {
return make(chan *msgstream.ConsumeMsgPack, 100)
}

func (mtm *mockTtMsgStream) AsProducer(ctx context.Context, channels []string) {}
Expand All @@ -77,6 +77,10 @@ func (mtm *mockTtMsgStream) AsConsumer(ctx context.Context, channels []string, s
return nil
}

func (mtm *mockTtMsgStream) GetUnmarshalDispatcher() msgstream.UnmarshalDispatcher {
return nil
}

func (mtm *mockTtMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}

func (mtm *mockTtMsgStream) GetProduceChannels() []string {
Expand Down
13 changes: 8 additions & 5 deletions internal/proxy/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func newDefaultMockDqlTask() *mockDqlTask {
}

type simpleMockMsgStream struct {
msgChan chan *msgstream.MsgPack
msgChan chan *msgstream.ConsumeMsgPack

msgCount int
msgCountMtx sync.RWMutex
Expand All @@ -244,7 +244,7 @@ type simpleMockMsgStream struct {
func (ms *simpleMockMsgStream) Close() {
}

func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.MsgPack {
func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack {
if ms.getMsgCount() <= 0 {
ms.msgChan <- nil
return ms.msgChan
Expand All @@ -255,6 +255,10 @@ func (ms *simpleMockMsgStream) Chan() <-chan *msgstream.MsgPack {
return ms.msgChan
}

func (ms *simpleMockMsgStream) GetUnmarshalDispatcher() msgstream.UnmarshalDispatcher {
return nil
}

func (ms *simpleMockMsgStream) AsProducer(ctx context.Context, channels []string) {
}

Expand Down Expand Up @@ -286,8 +290,7 @@ func (ms *simpleMockMsgStream) decreaseMsgCount(delta int) {
func (ms *simpleMockMsgStream) Produce(ctx context.Context, pack *msgstream.MsgPack) error {
defer ms.increaseMsgCount(1)

ms.msgChan <- pack

ms.msgChan <- msgstream.BuildConsumeMsgPack(pack)
return nil
}

Expand Down Expand Up @@ -319,7 +322,7 @@ func (ms *simpleMockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {

func newSimpleMockMsgStream() *simpleMockMsgStream {
return &simpleMockMsgStream{
msgChan: make(chan *msgstream.MsgPack, 1024),
msgChan: make(chan *msgstream.ConsumeMsgPack, 1024),
msgCount: 0,
}
}
Expand Down
10 changes: 9 additions & 1 deletion internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,15 @@ func (sd *shardDelegator) createStreamFromMsgStream(ctx context.Context, positio
if err != nil {
return nil, stream.Close, err
}
return stream.Chan(), stream.Close, nil

dispatcher := msgstream.NewSimpleMsgDispatcher(stream, func(pm msgstream.PackMsg) bool {
if pm.GetType() != commonpb.MsgType_Delete || pm.GetChannel() != vchannelName {
return false
}
return true
})

return dispatcher.Chan(), dispatcher.Close, nil
}

func (sd *shardDelegator) createDeleteStreamFromStreamingService(ctx context.Context, position *msgpb.MsgPosition) (ch <-chan *msgstream.MsgPack, closer func(), err error) {
Expand Down
9 changes: 6 additions & 3 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func (s *DelegatorDataSuite) SetupTest() {
// init schema
s.genNormalCollection()
s.mq = &msgstream.MockMsgStream{}
s.mq.EXPECT().GetUnmarshalDispatcher().Return(nil)

s.rootPath = s.Suite.T().Name()
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
Expand Down Expand Up @@ -916,8 +918,9 @@ func (s *DelegatorDataSuite) TestLoadSegments() {

s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Seek(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().GetUnmarshalDispatcher().Return(nil)
s.mq.EXPECT().Close()
ch := make(chan *msgstream.MsgPack, 10)
ch := make(chan *msgstream.ConsumeMsgPack, 10)
close(ch)

s.mq.EXPECT().Chan().Return(ch)
Expand Down Expand Up @@ -1584,7 +1587,7 @@ func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Seek(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Close()
ch := make(chan *msgstream.MsgPack, 10)
ch := make(chan *msgstream.ConsumeMsgPack, 10)
s.mq.EXPECT().Chan().Return(ch)

oracle := pkoracle.NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
Expand All @@ -1602,7 +1605,7 @@ func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
}

for _, data := range datas {
ch <- data
ch <- msgstream.BuildConsumeMsgPack(data)
}

result, err := s.delegator.readDeleteFromMsgstream(ctx, &msgpb.MsgPosition{Timestamp: 0}, 10, oracle)
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import (
type ServiceSuite struct {
suite.Suite
// Data
msgChan chan *msgstream.MsgPack
msgChan chan *msgstream.ConsumeMsgPack
collectionID int64
collectionName string
schema *schemapb.CollectionSchema
Expand Down
12 changes: 9 additions & 3 deletions internal/rootcoord/alter_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
Expand Down Expand Up @@ -250,7 +251,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
return nil
}
packChan := make(chan *msgstream.MsgPack, 10)
packChan := make(chan *msgstream.ConsumeMsgPack, 10)
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")

Expand All @@ -268,13 +269,18 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
},
}

unmarshalFactory := &msgstream.ProtoUDFactory{}
unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher()

err := task.Execute(context.Background())
assert.NoError(t, err)
time.Sleep(time.Second)
select {
case pack := <-packChan:
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].Type())
replicateMsg := pack.Msgs[0].(*msgstream.ReplicateMsg)
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType())
tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher)
require.NoError(t, err)
replicateMsg := tsMsg.(*msgstream.ReplicateMsg)
assert.Equal(t, "foo", replicateMsg.ReplicateMsg.GetDatabase())
assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetCollection())
assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd())
Expand Down
13 changes: 10 additions & 3 deletions internal/rootcoord/alter_database_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/model"
Expand Down Expand Up @@ -236,7 +237,7 @@ func Test_alterDatabaseTask_Execute(t *testing.T) {
mock.Anything,
).Return(nil)
// the chan length should larger than 4, because newChanTimeTickSync will send 4 ts messages when execute the `broadcast` step
packChan := make(chan *msgstream.MsgPack, 10)
packChan := make(chan *msgstream.ConsumeMsgPack, 10)
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")

Expand All @@ -252,13 +253,19 @@ func Test_alterDatabaseTask_Execute(t *testing.T) {
},
}

unmarshalFactory := &msgstream.ProtoUDFactory{}
unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher()

err := task.Execute(context.Background())
assert.NoError(t, err)
time.Sleep(time.Second)
select {
case pack := <-packChan:
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].Type())
replicateMsg := pack.Msgs[0].(*msgstream.ReplicateMsg)
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType())

tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher)
require.NoError(t, err)
replicateMsg := tsMsg.(*msgstream.ReplicateMsg)
assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetDatabase())
assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd())
default:
Expand Down
9 changes: 5 additions & 4 deletions internal/rootcoord/dml_channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,11 @@ type FailMsgStream struct {
errBroadcast bool
}

func (ms *FailMsgStream) Close() {}
func (ms *FailMsgStream) Chan() <-chan *msgstream.MsgPack { return nil }
func (ms *FailMsgStream) AsProducer(ctx context.Context, channels []string) {}
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
func (ms *FailMsgStream) Close() {}
func (ms *FailMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack { return nil }
func (ms *FailMsgStream) GetUnmarshalDispatcher() msgstream.UnmarshalDispatcher { return nil }
func (ms *FailMsgStream) AsProducer(ctx context.Context, channels []string) {}
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
func (ms *FailMsgStream) AsConsumer(ctx context.Context, channels []string, subName string, position common.SubscriptionInitialPosition) error {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/rootcoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,23 +1059,23 @@ func newTickerWithFactory(factory msgstream.Factory) *timetickSync {
return ticker
}

func newChanTimeTickSync(packChan chan *msgstream.MsgPack) *timetickSync {
func newChanTimeTickSync(packChan chan *msgstream.ConsumeMsgPack) *timetickSync {
f := msgstream.NewMockMqFactory()
f.NewMsgStreamFunc = func(ctx context.Context) (msgstream.MsgStream, error) {
stream := msgstream.NewWastedMockMsgStream()
stream.BroadcastFunc = func(pack *msgstream.MsgPack) error {
log.Info("mock Broadcast")
packChan <- pack
packChan <- msgstream.BuildConsumeMsgPack(pack)
return nil
}
stream.BroadcastMarkFunc = func(pack *msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
log.Info("mock BroadcastMark")
packChan <- pack
packChan <- msgstream.BuildConsumeMsgPack(pack)
return map[string][]msgstream.MessageID{}, nil
}
stream.AsProducerFunc = func(channels []string) {
}
stream.ChanFunc = func() <-chan *msgstream.MsgPack {
stream.ChanFunc = func() <-chan *msgstream.ConsumeMsgPack {
return packChan
}
return stream, nil
Expand Down
7 changes: 5 additions & 2 deletions internal/util/flowgraph/input_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -45,8 +46,9 @@ func TestInputNode(t *testing.T) {
produceStream.AsProducer(context.TODO(), channels)
produceStream.Produce(context.TODO(), &msgPack)

dispatcher := msgstream.NewSimpleMsgDispatcher(msgStream, func(pm msgstream.PackMsg) bool { return true })
nodeName := "input_node"
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, "", 0, 0, "")
inputNode := NewInputNode(dispatcher.Chan(), nodeName, 100, 100, "", 0, 0, "")
defer inputNode.Close()

isInputNode := inputNode.IsInputNode()
Expand Down Expand Up @@ -89,7 +91,8 @@ func Test_InputNodeSkipMode(t *testing.T) {
outputCh := make(chan bool)

nodeName := "input_node"
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, typeutil.DataNodeRole, 0, 0, "")
dispatcher := msgstream.NewSimpleMsgDispatcher(msgStream, func(pm msgstream.PackMsg) bool { return true })
inputNode := NewInputNode(dispatcher.Chan(), nodeName, 100, 100, typeutil.DataNodeRole, 0, 0, "")
defer inputNode.Close()

outputCount := 0
Expand Down
3 changes: 2 additions & 1 deletion internal/util/flowgraph/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func TestNodeManager_Start(t *testing.T) {
produceStream.Produce(context.TODO(), &msgPack)

nodeName := "input_node"
inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, "", 0, 0, "")
dispatcher := msgstream.NewSimpleMsgDispatcher(msgStream, func(pm msgstream.PackMsg) bool { return true })
inputNode := NewInputNode(dispatcher.Chan(), nodeName, 100, 100, "", 0, 0, "")

ddNode := BaseNode{}

Expand Down
7 changes: 6 additions & 1 deletion pkg/mq/common/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ const (
SubscriptionPositionUnknown
)

const MsgTypeKey = "msg_type"
const (
MsgTypeKey = "msg_type"
TimestampTypeKey = "timestamp"
ChannelTypeKey = "vchannel"
ReplicateIDTypeKey = "replicate_id"
)

func GetMsgType(msg Message) (commonpb.MsgType, error) {
msgType := commonpb.MsgType_Undefined
Expand Down
Loading
Loading