Skip to content

Commit

Permalink
[ISSUE #8877] Refactor lock in ReceiptHandleGroup to make the lock ca…
Browse files Browse the repository at this point in the history
…n be properly released when future can not be completed (#8916)
  • Loading branch information
qianye1001 authored Nov 21, 2024
1 parent 796c95b commit 5f26423
Showing 1 changed file with 47 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;

public class ReceiptHandleGroup {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

// The messages having the same messageId will be deduplicated based on the parameters of broker, queueId, and offset
protected final Map<String /* msgID */, Map<HandleKey, HandleData>> receiptHandleMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -98,22 +103,48 @@ public long getOffset() {

public static class HandleData {
private final Semaphore semaphore = new Semaphore(1);
private final AtomicLong lastLockTimeMs = new AtomicLong(-1L);
private volatile boolean needRemove = false;
private volatile MessageReceiptHandle messageReceiptHandle;

public HandleData(MessageReceiptHandle messageReceiptHandle) {
this.messageReceiptHandle = messageReceiptHandle;
}

public boolean lock(long timeoutMs) {
public Long lock(long timeoutMs) {
try {
return this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
boolean result = this.semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
long currentTimeMs = System.currentTimeMillis();
if (result) {
this.lastLockTimeMs.set(currentTimeMs);
return currentTimeMs;
} else {
// if the lock is expired, can be acquired again
long expiredTimeMs = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3;
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
synchronized (this) {
if (currentTimeMs - this.lastLockTimeMs.get() > expiredTimeMs) {
log.warn("HandleData lock expired, acquire lock success and reset lock time. " +
"MessageReceiptHandle={}, lockTime={}", messageReceiptHandle, currentTimeMs);
this.lastLockTimeMs.set(currentTimeMs);
return currentTimeMs;
}
}
}
}
return null;
} catch (InterruptedException e) {
return false;
return null;
}
}

public void unlock() {
public void unlock(long lockTimeMs) {
// if the lock is expired, we don't need to unlock it
if (System.currentTimeMillis() - lockTimeMs > ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 2) {
log.warn("HandleData lock expired, unlock fail. MessageReceiptHandle={}, lockTime={}, now={}",
messageReceiptHandle, lockTimeMs, System.currentTimeMillis());
return;
}
this.semaphore.release();
}

Expand Down Expand Up @@ -149,7 +180,8 @@ public void put(String msgID, MessageReceiptHandle value) {
if (handleData == null || handleData.needRemove) {
return new HandleData(value);
}
if (!handleData.lock(timeout)) {
Long lockTimeMs = handleData.lock(timeout);
if (lockTimeMs == null) {
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to put handle failed");
}
try {
Expand All @@ -158,7 +190,7 @@ public void put(String msgID, MessageReceiptHandle value) {
}
handleData.messageReceiptHandle = value;
} finally {
handleData.unlock();
handleData.unlock(lockTimeMs);
}
return handleData;
});
Expand All @@ -176,7 +208,8 @@ public MessageReceiptHandle get(String msgID, String handle) {
long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
handleMap.computeIfPresent(new HandleKey(handle), (handleKey, handleData) -> {
if (!handleData.lock(timeout)) {
Long lockTimeMs = handleData.lock(timeout);
if (lockTimeMs == null) {
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to get handle failed");
}
try {
Expand All @@ -185,7 +218,7 @@ public MessageReceiptHandle get(String msgID, String handle) {
}
res.set(handleData.messageReceiptHandle);
} finally {
handleData.unlock();
handleData.unlock(lockTimeMs);
}
return handleData;
});
Expand All @@ -200,7 +233,8 @@ public MessageReceiptHandle remove(String msgID, String handle) {
long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
handleMap.computeIfPresent(new HandleKey(handle), (handleKey, handleData) -> {
if (!handleData.lock(timeout)) {
Long lockTimeMs = handleData.lock(timeout);
if (lockTimeMs == null) {
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to remove and get handle failed");
}
try {
Expand All @@ -210,7 +244,7 @@ public MessageReceiptHandle remove(String msgID, String handle) {
}
return null;
} finally {
handleData.unlock();
handleData.unlock(lockTimeMs);
}
});
removeHandleMapKeyIfNeed(msgID);
Expand Down Expand Up @@ -240,7 +274,8 @@ public void computeIfPresent(String msgID, String handle,
}
long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
handleMap.computeIfPresent(new HandleKey(handle), (handleKey, handleData) -> {
if (!handleData.lock(timeout)) {
Long lockTimeMs = handleData.lock(timeout);
if (lockTimeMs == null) {
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute failed");
}
CompletableFuture<MessageReceiptHandle> future = function.apply(handleData.messageReceiptHandle);
Expand All @@ -255,7 +290,7 @@ public void computeIfPresent(String msgID, String handle,
handleData.messageReceiptHandle = messageReceiptHandle;
}
} finally {
handleData.unlock();
handleData.unlock(lockTimeMs);
}
if (handleData.needRemove) {
handleMap.remove(handleKey, handleData);
Expand Down

0 comments on commit 5f26423

Please sign in to comment.