Skip to content

Commit

Permalink
[ISSUE #8955] Fix message buffer not release and dispatch thread exit…
Browse files Browse the repository at this point in the history
… in tiered storage (#8965)
  • Loading branch information
lizhimins authored Nov 22, 2024
1 parent 5f26423 commit e876bed
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ public void dispatchWithSemaphore(FlatFileInterface flatFile) {
semaphore.acquire();
this.doScheduleDispatch(flatFile, false)
.whenComplete((future, throwable) -> semaphore.release());
} catch (InterruptedException e) {
} catch (Throwable t) {
semaphore.release();
log.error("MessageStore dispatch error, topic={}, queueId={}",
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t);
}
}

Expand Down Expand Up @@ -156,25 +158,22 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
}

if (currentOffset < minOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too small, " +
"topic={}, queueId={}, offset={}-{}, current={}",
log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
flatFileStore.destroyFile(flatFile.getMessageQueue());
flatFileStore.computeIfAbsent(new MessageQueue(topic, brokerName, queueId));
return CompletableFuture.completedFuture(true);
}

if (currentOffset > maxOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too large, " +
"topic: {}, queueId: {}, offset={}-{}, current={}",
log.warn("MessageDispatcher#dispatch, current offset is too large, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
return CompletableFuture.completedFuture(false);
}

long interval = TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval());
if (flatFile.rollingFile(interval)) {
log.info("MessageDispatcher#dispatch, rolling file, " +
"topic: {}, queueId: {}, offset={}-{}, current={}",
log.info("MessageDispatcher#dispatch, rolling file, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
}

Expand All @@ -189,27 +188,40 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,

ConsumeQueueInterface consumeQueue = defaultStore.getConsumeQueue(topic, queueId);
CqUnit cqUnit = consumeQueue.get(currentOffset);
if (cqUnit == null) {
log.warn("MessageDispatcher#dispatch cq not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
return CompletableFuture.completedFuture(false);
}

SelectMappedBufferResult message =
defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
if (message == null) {
log.warn("MessageDispatcher#dispatch message not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
return CompletableFuture.completedFuture(false);
}

boolean timeout = MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
storeConfig.getTieredStoreGroupCommitTimeout() < System.currentTimeMillis();
boolean bufferFull = maxOffsetInQueue - currentOffset > storeConfig.getTieredStoreGroupCommitCount();

if (!timeout && !bufferFull && !force) {
log.debug("MessageDispatcher#dispatch hold, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
message.release();
return CompletableFuture.completedFuture(false);
} else {
if (MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
TimeUnit.MINUTES.toMillis(5) < System.currentTimeMillis()) {
log.warn("MessageDispatcher#dispatch behind too much, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
} else {
log.info("MessageDispatcher#dispatch, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
log.info("MessageDispatcher#dispatch success, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
}
message.release();
}
message.release();

long offset = currentOffset;
for (; offset < targetOffset; offset++) {
Expand Down Expand Up @@ -279,7 +291,7 @@ public CompletableFuture<Void> commitAsync(FlatFileInterface flatFile) {
}
flatFile.release();
}
}, MessageStoreExecutor.getInstance().bufferCommitExecutor);
}, storeExecutor.bufferCommitExecutor);
}

/**
Expand All @@ -301,8 +313,12 @@ public void constructIndexFile(long topicId, DispatchRequest request) {
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
this.waitForRunning(Duration.ofSeconds(20).toMillis());
try {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
this.waitForRunning(Duration.ofSeconds(20).toMillis());
} catch (Throwable t) {
log.error("MessageStore dispatch error", t);
}
}
log.info("{} service shutdown", this.getServiceName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
Expand Down Expand Up @@ -261,56 +260,55 @@ public CompletableFuture<List<IndexItem>> queryAsync(
protected CompletableFuture<List<IndexItem>> queryAsyncFromUnsealedFile(
String key, int maxCount, long beginTime, long endTime) {

return CompletableFuture.supplyAsync(() -> {
List<IndexItem> result = new ArrayList<>();
try {
fileReadWriteLock.readLock().lock();
if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) {
return result;
}
List<IndexItem> result = new ArrayList<>();
try {
fileReadWriteLock.readLock().lock();
if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) {
return CompletableFuture.completedFuture(result);
}

if (mappedFile == null || !mappedFile.hold()) {
return result;
}
if (mappedFile == null || !mappedFile.hold()) {
return CompletableFuture.completedFuture(result);
}

int hashCode = this.hashCode(key);
int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
int slotValue = this.getSlotValue(slotPosition);
int hashCode = this.hashCode(key);
int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
int slotValue = this.getSlotValue(slotPosition);

int left = MAX_QUERY_COUNT;
while (left > 0 &&
slotValue > INVALID_INDEX &&
slotValue <= this.indexItemCount.get()) {
int left = MAX_QUERY_COUNT;
while (left > 0 &&
slotValue > INVALID_INDEX &&
slotValue <= this.indexItemCount.get()) {

byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
ByteBuffer buffer = this.byteBuffer.duplicate();
buffer.position(this.getItemPosition(slotValue));
buffer.get(bytes);
IndexItem indexItem = new IndexItem(bytes);
long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get();
if (hashCode == indexItem.getHashCode() &&
beginTime <= storeTimestamp && storeTimestamp <= endTime) {
result.add(indexItem);
if (result.size() > maxCount) {
break;
}
byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
ByteBuffer buffer = this.byteBuffer.duplicate();
buffer.position(this.getItemPosition(slotValue));
buffer.get(bytes);
IndexItem indexItem = new IndexItem(bytes);
long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get();
if (hashCode == indexItem.getHashCode() &&
beginTime <= storeTimestamp && storeTimestamp <= endTime) {
result.add(indexItem);
if (result.size() > maxCount) {
break;
}
slotValue = indexItem.getItemIndex();
left--;
}

log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " +
"key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime);
} catch (Exception e) {
log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " +
"key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e);
} finally {
fileReadWriteLock.readLock().unlock();
mappedFile.release();
slotValue = indexItem.getItemIndex();
left--;
}
return result;
}, MessageStoreExecutor.getInstance().bufferFetchExecutor);

log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " +
"key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime);
} catch (Exception e) {
log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " +
"key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e);
} finally {
fileReadWriteLock.readLock().unlock();
mappedFile.release();
}

return CompletableFuture.completedFuture(result);
}

protected CompletableFuture<List<IndexItem>> queryAsyncFromSegmentFile(
Expand Down Expand Up @@ -465,7 +463,7 @@ public void shutdown() {
fileReadWriteLock.writeLock().lock();
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
if (this.fileSegment != null && this.fileSegment instanceof PosixFileSegment) {
((PosixFileSegment) this.fileSegment).close();
this.fileSegment.close();
}
if (this.mappedFile != null) {
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream;
Expand Down Expand Up @@ -230,6 +229,6 @@ public CompletableFuture<Boolean> commit0(
return false;
}
return true;
}, MessageStoreExecutor.getInstance().bufferCommitExecutor);
});
}
}

0 comments on commit e876bed

Please sign in to comment.