From 16224f3bd36233ce8e4fa2116951274bc8c01c0c Mon Sep 17 00:00:00 2001 From: chyezh Date: Wed, 25 Dec 2024 11:15:03 +0800 Subject: [PATCH 1/3] fix: the close operation of rmq consumer is not sync Signed-off-by: chyezh --- pkg/mq/mqimpl/rocksmq/client/client_impl.go | 5 ++++- pkg/mq/mqimpl/rocksmq/client/consumer_impl.go | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/mq/mqimpl/rocksmq/client/client_impl.go b/pkg/mq/mqimpl/rocksmq/client/client_impl.go index f68e6c602f2e1..95f9bcf897e93 100644 --- a/pkg/mq/mqimpl/rocksmq/client/client_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/client_impl.go @@ -124,7 +124,10 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { } func (c *client) consume(consumer *consumer) { - defer c.wg.Done() + defer func() { + close(consumer.stopCh) + c.wg.Done() + }() if err := c.blockUntilInitDone(consumer); err != nil { log.Warn("consumer init failed", zap.Error(err)) diff --git a/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go b/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go index fb907b4defc76..1964d5e347bfe 100644 --- a/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go @@ -28,6 +28,7 @@ type consumer struct { startOnce sync.Once + stopCh chan struct{} msgMutex chan struct{} initCh chan struct{} messageCh chan common.Message @@ -58,6 +59,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) { client: c, consumerName: options.SubscriptionName, options: options, + stopCh: make(chan struct{}), msgMutex: make(chan struct{}, 1), initCh: initCh, messageCh: messageCh, @@ -134,6 +136,7 @@ func (c *consumer) Close() { if err != nil { log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err)) } + <-c.stopCh } func (c *consumer) GetLatestMsgID() (int64, error) { From be3fa875e69ee884ce169fad074b8631d06ec31e Mon Sep 17 00:00:00 2001 From: chyezh Date: Thu, 26 Dec 2024 20:13:25 +0800 Subject: [PATCH 2/3] fix: unittest Signed-off-by: chyezh --- internal/datanode/services_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 616354a16482d..7fa6cf92f7e12 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -329,6 +329,10 @@ func (s *DataNodeServicesSuite) TestCompaction() { func (s *DataNodeServicesSuite) TestFlushSegments() { dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments" + stream, err := s.node.factory.NewTtMsgStream(context.Background()) + s.NoError(err) + s.NotNil(stream) + stream.AsProducer(context.Background(), []string{dmChannelName}) schema := &schemapb.CollectionSchema{ Name: "test_collection", Fields: []*schemapb.FieldSchema{ From a02e0e068d3241efae231fc11f53ebe217635753 Mon Sep 17 00:00:00 2001 From: chyezh Date: Fri, 27 Dec 2024 10:15:59 +0800 Subject: [PATCH 3/3] fix: unittest Signed-off-by: chyezh --- internal/datanode/channel/channel_manager_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/datanode/channel/channel_manager_test.go b/internal/datanode/channel/channel_manager_test.go index 1a4aa78c56735..e4384bf271301 100644 --- a/internal/datanode/channel/channel_manager_test.go +++ b/internal/datanode/channel/channel_manager_test.go @@ -265,9 +265,14 @@ func (s *ChannelManagerSuite) TestSubmitSkip() { func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() { channel := "by-dev-rootcoord-dml-0" + stream, err := s.pipelineParams.MsgStreamFactory.NewTtMsgStream(context.Background()) + s.NoError(err) + s.NotNil(stream) + stream.AsProducer(context.Background(), []string{channel}) + // watch info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch) - err := s.manager.Submit(info) + err = s.manager.Submit(info) s.NoError(err) // wait for result