Skip to content

Commit

Permalink
GH-8894: Fix race condition in the DelayHandler
Browse files Browse the repository at this point in the history
Fixes: #8894

* Guard `this.messageStore` access in the `DelayHandler` with existing `this.lock`
to avoid `ConcurrentModificationException`

**Cherry-pick to `6.2.x`**

(cherry picked from commit 4b43969)
  • Loading branch information
artembilan committed Feb 7, 2024
1 parent d225ddf commit 9ea7581
Showing 1 changed file with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -455,7 +455,14 @@ private void releaseMessageAfterDelay(final Message<?> message, long delay) {
.withPayload(messageWrapper)
.copyHeaders(message.getHeaders())
.build();
this.messageStore.addMessageToGroup(this.messageGroupId, delayedMessage);

this.lock.lock();
try {
this.messageStore.addMessageToGroup(this.messageGroupId, delayedMessage);
}
finally {
this.lock.unlock();
}
}

Runnable releaseTask = releaseTaskForMessage(delayedMessage);
Expand Down Expand Up @@ -495,15 +502,21 @@ private Runnable releaseTaskForMessage(Message<?> delayedMessage) {
}

private Message<?> getMessageById(UUID messageId) {
Message<?> theMessage = this.messageStore.getMessageFromGroup(this.messageGroupId, messageId);
this.lock.lock();
try {
Message<?> theMessage = this.messageStore.getMessageFromGroup(this.messageGroupId, messageId);

if (theMessage == null) {
logger.debug(() -> "No message in the Message Store for id: " + messageId +
". Likely another instance has already released it.");
return null;
if (theMessage == null) {
logger.debug(() -> "No message in the Message Store for id: " + messageId +
". Likely another instance has already released it.");
return null;
}
else {
return theMessage;
}
}
else {
return theMessage;
finally {
this.lock.unlock();
}
}

Expand Down Expand Up @@ -568,9 +581,16 @@ protected void rescheduleAt(Message<?> message, Date startTime) {
}

private void doReleaseMessage(Message<?> message) {
if (this.messageStore.removeMessageFromGroupById(this.messageGroupId, message.getHeaders().getId())
|| this.deliveries.get(ObjectUtils.getIdentityHexString(message)).get() > 0) {
boolean removed;
this.lock.lock();
try {
removed = this.messageStore.removeMessageFromGroupById(this.messageGroupId, message.getHeaders().getId());
}
finally {
this.lock.unlock();
}

if (removed || this.deliveries.get(ObjectUtils.getIdentityHexString(message)).get() > 0) {
handleMessageInternal(message);
}
else {
Expand All @@ -581,7 +601,13 @@ private void doReleaseMessage(Message<?> message) {

@Override
public int getDelayedMessageCount() {
return this.messageStore.messageGroupSize(this.messageGroupId);
this.lock.lock();
try {
return this.messageStore.messageGroupSize(this.messageGroupId);
}
finally {
this.lock.unlock();
}
}

/**
Expand Down

0 comments on commit 9ea7581

Please sign in to comment.