Skip to content

Commit

Permalink
Improve IO for asynchronous delivery processes (#8954)
Browse files Browse the repository at this point in the history
Co-authored-by: wanghuaiyuan <[email protected]>
  • Loading branch information
3424672656 and wanghuaiyuan authored Nov 20, 2024
1 parent ae7179d commit c13f051
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -2179,7 +2179,9 @@ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMess
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
if (defaultMessageStore.getMessageStoreConfig().isWakeFlushWhenPutMessage()) {
flushCommitLogService.wakeup();
}
} else {
if (defaultMessageStore.getMessageStoreConfig().isWakeCommitWhenPutMessage()) {
commitRealTimeService.wakeup();
Expand All @@ -2206,9 +2208,13 @@ public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult r
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
if (defaultMessageStore.getMessageStoreConfig().isWakeFlushWhenPutMessage()) {
flushCommitLogService.wakeup();
}
} else {
commitRealTimeService.wakeup();
if (defaultMessageStore.getMessageStoreConfig().isWakeCommitWhenPutMessage()) {
commitRealTimeService.wakeup();
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
Expand Down

0 comments on commit c13f051

Please sign in to comment.