Skip to content

Commit

Permalink
[fix][client] Consumer lost message ack due to race condition in ackn…
Browse files Browse the repository at this point in the history
…owledge with batch message (apache#22353)

Co-authored-by: Yunze Xu <[email protected]>
Co-authored-by: 汪苏诚 <[email protected]>
(cherry picked from commit 3fa2ae8)
(cherry picked from commit 9c50d18)
  • Loading branch information
Shawyeok authored and mukesh-ctds committed Apr 15, 2024
1 parent f93142e commit 8c31dfa
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ static boolean equals(MessageIdAdv lhs, Object o) {
&& lhs.getBatchIndex() == rhs.getBatchIndex();
}

/**
* Acknowledge batch message.
*
* @param msgId the message id
* @param individual whether to acknowledge the batch message individually
* @return true if the batch message is fully acknowledged
*/
static boolean acknowledge(MessageIdAdv msgId, boolean individual) {
if (!isBatch(msgId)) {
return true;
Expand All @@ -51,12 +58,14 @@ static boolean acknowledge(MessageIdAdv msgId, boolean individual) {
return false;
}
int batchIndex = msgId.getBatchIndex();
if (individual) {
ackSet.clear(batchIndex);
} else {
ackSet.clear(0, batchIndex + 1);
synchronized (ackSet) {
if (individual) {
ackSet.clear(batchIndex);
} else {
ackSet.clear(0, batchIndex + 1);
}
return ackSet.isEmpty();
}
return ackSet.isEmpty();
}

static boolean isBatch(MessageIdAdv msgId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,15 @@ private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
MessageIdAdvUtils.discardBatch(msgId), __ -> {
final BitSet ackSet = msgId.getAckSet();
final ConcurrentBitSetRecyclable value;
if (ackSet != null && !ackSet.isEmpty()) {
value = ConcurrentBitSetRecyclable.create(ackSet);
if (ackSet != null) {
synchronized (ackSet) {
if (!ackSet.isEmpty()) {
value = ConcurrentBitSetRecyclable.create(ackSet);
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, msgId.getBatchSize());
}
}
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, msgId.getBatchSize());
Expand Down Expand Up @@ -374,8 +381,11 @@ private CompletableFuture<Void> doImmediateBatchIndexAck(MessageIdAdv msgId, int
.ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));
}
BitSetRecyclable bitSet;
if (msgId.getAckSet() != null) {
bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray());
BitSet ackSetFromMsgId = msgId.getAckSet();
if (ackSetFromMsgId != null) {
synchronized (ackSetFromMsgId) {
bitSet = BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray());
}
} else {
bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.BitSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.testng.annotations.Test;

/**
* Unit test for {@link MessageIdAdvUtils}.
*/
public class MessageIdAdvUtilsTest {

/**
* Call <code>acknowledge</code> concurrently with batch message, and verify that only return true once
*
* @see MessageIdAdvUtils#acknowledge(MessageIdAdv, boolean)
* @see MessageIdAdv#getAckSet()
*/
@Test
public void testAcknowledgeIndividualConcurrently() throws InterruptedException {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pulsar-consumer-%d").build();
@Cleanup("shutdown")
ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 100; i++) {
int batchSize = 32;
BitSet bitSet = new BitSet(batchSize);
bitSet.set(0, batchSize);
AtomicInteger individualAcked = new AtomicInteger();
Phaser phaser = new Phaser(1);
CountDownLatch finishLatch = new CountDownLatch(batchSize);
for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
phaser.register();
BatchMessageIdImpl messageId = new BatchMessageIdImpl(1, 0, 0, batchIndex, batchSize, bitSet);
executorService.execute(() -> {
try {
phaser.arriveAndAwaitAdvance();
if (MessageIdAdvUtils.acknowledge(messageId, true)) {
individualAcked.incrementAndGet();
}
} finally {
finishLatch.countDown();
}
});
}
phaser.arriveAndDeregister();
finishLatch.await();
assertEquals(individualAcked.get(), 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ default int getBatchSize() {
* @implNote The message IDs of a batch should share a BitSet. For example, given 3 messages in the same batch whose
* size is 3, all message IDs of them should return "111" (i.e. a BitSet whose size is 3 and all bits are 1). If the
* 1st message has been acknowledged, the returned BitSet should become "011" (i.e. the 1st bit become 0).
* If the caller performs any read or write operations on the return value of this method, they should do so with
* lock protection.
*
* @return null if the message is a non-batched message
*/
Expand Down

0 comments on commit 8c31dfa

Please sign in to comment.