diff --git a/pkg/mq/mqimpl/rocksmq/client/client_impl.go b/pkg/mq/mqimpl/rocksmq/client/client_impl.go index f68e6c602f2e1..95f9bcf897e93 100644 --- a/pkg/mq/mqimpl/rocksmq/client/client_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/client_impl.go @@ -124,7 +124,10 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { } func (c *client) consume(consumer *consumer) { - defer c.wg.Done() + defer func() { + close(consumer.stopCh) + c.wg.Done() + }() if err := c.blockUntilInitDone(consumer); err != nil { log.Warn("consumer init failed", zap.Error(err)) diff --git a/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go b/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go index fb907b4defc76..1964d5e347bfe 100644 --- a/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/consumer_impl.go @@ -28,6 +28,7 @@ type consumer struct { startOnce sync.Once + stopCh chan struct{} msgMutex chan struct{} initCh chan struct{} messageCh chan common.Message @@ -58,6 +59,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) { client: c, consumerName: options.SubscriptionName, options: options, + stopCh: make(chan struct{}), msgMutex: make(chan struct{}, 1), initCh: initCh, messageCh: messageCh, @@ -134,6 +136,7 @@ func (c *consumer) Close() { if err != nil { log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err)) } + <-c.stopCh } func (c *consumer) GetLatestMsgID() (int64, error) {