Skip to content

Commit

Permalink
fix: the close operation of rmq consumer is not sync
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 25, 2024
1 parent acc8fb7 commit d404aef
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
5 changes: 4 additions & 1 deletion pkg/mq/mqimpl/rocksmq/client/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
}

func (c *client) consume(consumer *consumer) {
defer c.wg.Done()
defer func() {
close(consumer.stopCh)
c.wg.Done()
}()

if err := c.blockUntilInitDone(consumer); err != nil {
log.Warn("consumer init failed", zap.Error(err))
Expand Down
3 changes: 3 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type consumer struct {

startOnce sync.Once

stopCh chan struct{}
msgMutex chan struct{}
initCh chan struct{}
messageCh chan common.Message
Expand Down Expand Up @@ -58,6 +59,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
client: c,
consumerName: options.SubscriptionName,
options: options,
stopCh: make(chan struct{}),
msgMutex: make(chan struct{}, 1),
initCh: initCh,
messageCh: messageCh,
Expand Down Expand Up @@ -134,6 +136,7 @@ func (c *consumer) Close() {
if err != nil {
log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err))
}
<-c.stopCh
}

func (c *consumer) GetLatestMsgID() (int64, error) {
Expand Down

0 comments on commit d404aef

Please sign in to comment.