diff --git a/internal/datanode/channel/channel_manager_test.go b/internal/datanode/channel/channel_manager_test.go index 1a4aa78c56735..e4384bf271301 100644 --- a/internal/datanode/channel/channel_manager_test.go +++ b/internal/datanode/channel/channel_manager_test.go @@ -265,9 +265,14 @@ func (s *ChannelManagerSuite) TestSubmitSkip() { func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() { channel := "by-dev-rootcoord-dml-0" + stream, err := s.pipelineParams.MsgStreamFactory.NewTtMsgStream(context.Background()) + s.NoError(err) + s.NotNil(stream) + stream.AsProducer(context.Background(), []string{channel}) + // watch info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch) - err := s.manager.Submit(info) + err = s.manager.Submit(info) s.NoError(err) // wait for result diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 616354a16482d..7fa6cf92f7e12 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -329,6 +329,10 @@ func (s *DataNodeServicesSuite) TestCompaction() { func (s *DataNodeServicesSuite) TestFlushSegments() { dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments" + stream, err := s.node.factory.NewTtMsgStream(context.Background()) + s.NoError(err) + s.NotNil(stream) + stream.AsProducer(context.Background(), []string{dmChannelName}) schema := &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ diff --git a/pkg/mq/mqimpl/rocksmq/client/client_impl.go b/pkg/mq/mqimpl/rocksmq/client/client_impl.go index f68e6c602f2e1..95f9bcf897e93 100644 --- a/pkg/mq/mqimpl/rocksmq/client/client_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/client_impl.go @@ -124,7 +124,10 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { } func (c *client) consume(consumer *consumer) { - defer c.wg.Done() + defer func() { + close(consumer.stopCh) + c.wg.Done() + }() if err := c.blockUntilInitDone(consumer); err != nil { log.Warn("consumer init failed", zap.Error(err)) diff --git a/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go b/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go index fb907b4defc76..1964d5e347bfe 100644 --- a/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go @@ -28,6 +28,7 @@ type consumer struct { startOnce sync.Once + stopCh chan struct{} msgMutex chan struct{} initCh chan struct{} messageCh chan common.Message @@ -58,6 +59,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) { client: c, consumerName: options.SubscriptionName, options: options, + stopCh: make(chan struct{}), msgMutex: make(chan struct{}, 1), initCh: initCh, messageCh: messageCh, @@ -134,6 +136,7 @@ func (c *consumer) Close() { if err != nil { log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err)) } + <-c.stopCh } func (c *consumer) GetLatestMsgID() (int64, error) {