Skip to content

Commit

Permalink
[fix][broker] Fix incorrect unack count when using shared subscriptio…
Browse files Browse the repository at this point in the history
…n on non-persistent topic
  • Loading branch information
1Jack2 committed Nov 19, 2023
1 parent 403faa4 commit 95f132f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch
topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get());
}
incrementUnackedMessages(unackedMessages);
final int finalUnackedMessages = unackedMessages;
Future<Void> writeAndFlushPromise =
cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
Expand All @@ -361,6 +362,9 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
if (!(subscription instanceof PersistentSubscription)) {
addAndGetUnAckedMsgs(this, -finalUnackedMessages);
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Sent messages to client fail by IO exception[{}], close the connection"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,4 +446,34 @@ public void testAvgMessagesPerEntry() throws Exception {
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
assertEquals(3, avgMessagesPerEntry);
}

@Test()
public void testNonPersistentTopicSharedSubscriptionUnackedMessages() throws Exception {
final String topicName = "non-persistent://my-property/my-ns/my-topic" + UUID.randomUUID();
final String subName = "my-sub";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

for (int i = 0; i < 5; i++) {
producer.send(("message-" + i).getBytes());
}
for (int i = 0; i < 5; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
TopicStats topicStats = admin.topics().getStats(topicName);
assertEquals(topicStats.getSubscriptions().size(), 1);
List<? extends ConsumerStats> consumers = topicStats.getSubscriptions().get(subName).getConsumers();
assertEquals(consumers.size(), 1);
assertEquals(consumers.get(0).getUnackedMessages(), 0);
}
}

0 comments on commit 95f132f

Please sign in to comment.