Skip to content

Commit

Permalink
[ISSUE apache#8780] Implement asynchronous storage of ack/ck messages…
Browse files Browse the repository at this point in the history
… in pop consume to enhance performance (apache#8727)

* Pop consume asynchronization

* Pass UTs and ITs

* Pass the checkstyle

* Fix LocalGrpcIT can not pass

* Fix the UT can not pass

* Simplify duplicate methods in EscapeBridge
  • Loading branch information
RongtongJin authored Oct 16, 2024
1 parent a7dc023 commit 4f5f705
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt, Str
brokerNameToSend = mqSelected.getBrokerName();
if (this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend)) {
LOG.warn("putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: {}, Broker: {}",
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
return null;
}
} else {
Expand All @@ -147,7 +147,7 @@ public SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt, Str
final String brokerAddrToSend = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
if (null == brokerAddrToSend) {
LOG.warn("putMessageToRemoteBroker failed, remote broker address not found. Topic: {}, MsgId: {}, Broker: {}",
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
return null;
}

Expand Down Expand Up @@ -197,7 +197,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
producerGroup, SEND_TIMEOUT);

return future.exceptionally(throwable -> null)
.thenApplyAsync(sendResult -> transformSendResult2PutResult(sendResult), this.defaultAsyncSenderExecutor)
.thenApplyAsync(this::transformSendResult2PutResult, this.defaultAsyncSenderExecutor)
.exceptionally(throwable -> transformSendResult2PutResult(null));

} catch (Exception e) {
Expand All @@ -211,7 +211,6 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
}
}


private String getProducerGroup(MessageExtBrokerInner messageExt) {
if (null == messageExt) {
return this.innerProducerGroupName;
Expand All @@ -223,12 +222,29 @@ private String getProducerGroup(MessageExtBrokerInner messageExt) {
return producerGroup;
}


public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner messageExt) {
BrokerController masterBroker = this.brokerController.peekMasterBroker();
if (masterBroker != null) {
return masterBroker.getMessageStore().putMessage(messageExt);
} else if (this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
}
try {
return asyncRemotePutMessageToSpecificQueue(messageExt).get(SEND_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.error("Put message to specific queue error", e);
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null, true);
}
}

public CompletableFuture<PutMessageResult> asyncPutMessageToSpecificQueue(MessageExtBrokerInner messageExt) {
BrokerController masterBroker = this.brokerController.peekMasterBroker();
if (masterBroker != null) {
return masterBroker.getMessageStore().asyncPutMessage(messageExt);
}
return asyncRemotePutMessageToSpecificQueue(messageExt);
}

public CompletableFuture<PutMessageResult> asyncRemotePutMessageToSpecificQueue(MessageExtBrokerInner messageExt) {
if (this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()
&& this.brokerController.getBrokerConfig().isEnableRemoteEscape()) {
try {
messageExt.setWaitStoreMsgOK(false);
Expand All @@ -237,7 +253,7 @@ public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner messageE
List<MessageQueue> mqs = topicPublishInfo.getMessageQueueList();

if (null == mqs || mqs.isEmpty()) {
return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
}

String id = messageExt.getTopic() + messageExt.getStoreHost();
Expand All @@ -248,19 +264,17 @@ public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner messageE

String brokerNameToSend = mq.getBrokerName();
String brokerAddrToSend = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
final SendResult sendResult = this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBroker(
return this.brokerController.getBrokerOuterAPI().sendMessageToSpecificBrokerAsync(
brokerAddrToSend, brokerNameToSend,
messageExt, this.getProducerGroup(messageExt), SEND_TIMEOUT);

return transformSendResult2PutResult(sendResult);
messageExt, this.getProducerGroup(messageExt), SEND_TIMEOUT).thenCompose(sendResult -> CompletableFuture.completedFuture(transformSendResult2PutResult(sendResult)));
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, null, true));
}
} else {
LOG.warn("Put message to specific queue failed, enableSlaveActingMaster={}, enableRemoteEscape={}.",
this.brokerController.getBrokerConfig().isEnableSlaveActingMaster(), this.brokerController.getBrokerConfig().isEnableRemoteEscape());
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
}
}

Expand All @@ -282,12 +296,14 @@ private PutMessageResult transformSendResult2PutResult(SendResult sendResult) {
}
}

public Triple<MessageExt, String, Boolean> getMessage(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
public Triple<MessageExt, String, Boolean> getMessage(String topic, long offset, int queueId, String brokerName,
boolean deCompressBody) {
return getMessageAsync(topic, offset, queueId, brokerName, deCompressBody).join();
}

// Triple<MessageExt, info, needRetry>, check info and retry if and only if MessageExt is null
public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(String topic, long offset,
int queueId, String brokerName, boolean deCompressBody) {
MessageStore messageStore = brokerController.getMessageStoreByBrokerName(brokerName);
if (messageStore != null) {
return messageStore.getMessageAsync(innerConsumerGroupName, topic, queueId, offset, 1, null)
Expand All @@ -300,9 +316,9 @@ public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(St
if (list == null || list.isEmpty()) {
// OFFSET_FOUND_NULL returned by TieredMessageStore indicates exception occurred
boolean needRetry = GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
&& messageStore instanceof TieredMessageStore;
&& messageStore instanceof TieredMessageStore;
LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, needRetry {}, result is {}",
topic, offset, queueId, needRetry, result);
topic, offset, queueId, needRetry, result);
return Triple.of(null, "Can not get msg", needRetry);
}
return Triple.of(list.get(0), "", false);
Expand Down Expand Up @@ -340,12 +356,14 @@ protected List<MessageExt> decodeMsgList(GetMessageResult getMessageResult, bool
return foundList;
}

protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String topic, long offset, int queueId, String brokerName) {
protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String topic, long offset, int queueId,
String brokerName) {
return getMessageFromRemoteAsync(topic, offset, queueId, brokerName).join();
}

// Triple<MessageExt, info, needRetry>, check info and retry if and only if MessageExt is null
protected CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageFromRemoteAsync(String topic, long offset, int queueId, String brokerName) {
protected CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageFromRemoteAsync(String topic,
long offset, int queueId, String brokerName) {
try {
String brokerAddr = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
if (null == brokerAddr) {
Expand All @@ -359,11 +377,11 @@ protected CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageFromR
}

return this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(brokerName,
brokerAddr, this.innerConsumerGroupName, topic, queueId, offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
brokerAddr, this.innerConsumerGroupName, topic, queueId, offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
.thenApply(pullResult -> {
if (pullResult.getLeft() != null
&& PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
&& CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
&& PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
&& CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
return Triple.of(pullResult.getLeft().getMsgFoundList().get(0), "", false);
}
return Triple.of(null, pullResult.getMiddle(), pullResult.getRight());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public boolean isPopReviveServiceRunning() {

@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}

Expand All @@ -108,7 +108,7 @@ public boolean rejectRequest() {
}

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
boolean brokerAllowSuspend) throws RemotingCommandException {
AckMessageRequestHeader requestHeader;
BatchAckMessageRequestBody reqBody = null;
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
Expand All @@ -126,7 +126,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re

if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() || requestHeader.getQueueId() < 0) {
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(errorInfo);
Expand All @@ -137,7 +137,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() > maxOffset) {
String errorInfo = String.format("offset is illegal, key:%s@%d, commit:%d, store:%d~%d",
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getOffset(), minOffset, maxOffset);
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getOffset(), minOffset, maxOffset);
POP_LOGGER.warn(errorInfo);
response.setCode(ResponseCode.NO_MESSAGE);
response.setRemark(errorInfo);
Expand Down Expand Up @@ -165,7 +165,8 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return response;
}

private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck, final RemotingCommand response, final Channel channel, String brokerName) {
private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck,
final RemotingCommand response, final Channel channel, String brokerName) {
String[] extraInfo;
String consumeGroup, topic;
int qId, rqId;
Expand Down Expand Up @@ -268,18 +269,36 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
msgInner.setDeliverTimeMs(popTime + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (brokerController.getBrokerConfig().isAppendAckAsync()) {
int finalAckCount = ackCount;
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
}).exceptionally(throwable -> {
handlePutMessageResult(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null, false),
ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
POP_LOGGER.error("put ack msg error ", throwable);
return null;
});
} else {
PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, ackCount);
}
}

private void handlePutMessageResult(PutMessageResult putMessageResult, AckMsg ackMsg, String topic,
String consumeGroup, long popTime, int qId, int ackCount) {
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
}

protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) {
protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime,
long invisibleTime, Channel channel, RemotingCommand response) {
String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
if (ackOffset < oldOffset) {
Expand Down
Loading

0 comments on commit 4f5f705

Please sign in to comment.