Skip to content

Commit

Permalink
[AMQ-8354] Add logic to load more messages if we can't find last mess…
Browse files Browse the repository at this point in the history
…age for recovery.

Add check if the broker is stopping to stop the task runners faster.
  • Loading branch information
NikitaShupletsov committed Jun 13, 2024
1 parent cd92d6b commit fd1e595
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.MessageReferenceFilter;
import org.apache.activemq.broker.region.PrefetchSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
Expand All @@ -33,6 +35,7 @@
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.TransactionId;
Expand All @@ -46,7 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import javax.jms.JMSException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -166,7 +169,7 @@ void initialize() throws Exception {
intermediateQueue.iterate();
String savedSequences = sequenceStorage.initialize(subscriptionConnectionContext);
List<String> savedSequencesToRestore = restoreSequenceStorage.initialize(subscriptionConnectionContext);
restoreSequence(savedSequences, savedSequencesToRestore);
restoreSequence(intermediateQueue, savedSequences, savedSequencesToRestore);

scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::asyncSendWakeup,
Expand Down Expand Up @@ -217,7 +220,7 @@ void deinitialize() throws Exception {

}

void restoreSequence(String savedSequence, List<String> savedSequencesToRestore) throws Exception {
void restoreSequence(Queue intermediateQueue, String savedSequence, List<String> savedSequencesToRestore) throws Exception {
if (savedSequence != null) {
String[] split = savedSequence.split("#");
if (split.length != 2) {
Expand Down Expand Up @@ -246,16 +249,37 @@ void restoreSequence(String savedSequence, List<String> savedSequencesToRestore)
break;
}
}

ConnectionContext connectionContext = createConnectionContext();

if (!found) {
throw new IllegalStateException("Can't recover sequence. Message with id: " + recoveryMessageId + " not found");
Set<String> matchingIds = matchingMessages.stream()
.map(MessageReference::getMessageId)
.map(MessageId::toString)
.collect(Collectors.toSet());

List<QueueMessageReference> extraMessages = intermediateQueue.getMessagesUntilMatches(connectionContext,
(context, mr) -> mr.getMessageId().equals(recoveryMessageId));
if (extraMessages == null) {
throw new IllegalStateException("Can't recover sequence. Message with id: " + recoveryMessageId + " not found");
}

List<MessageReference> toDispatch = new ArrayList<>();
for (MessageReference mr : extraMessages) {
if (matchingIds.contains(mr.getMessageId().toString())) {
continue;
}
matchingMessages.add(mr);
toDispatch.add(mr);
}
intermediateQueue.dispatchNotification(subscription, toDispatch);
}

TransactionId transactionId = new LocalTransactionId(
new ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
boolean rollbackOnFail = false;

ConnectionContext connectionContext = createConnectionContext();
BigInteger sequence = null;
try {
broker.beginTransaction(connectionContext, transactionId);
Expand Down Expand Up @@ -406,7 +430,7 @@ boolean iterateAck() {
}
}

return pendingAckWakeups.get() > 0;
return !broker.getBrokerService().isStopping() && pendingAckWakeups.get() > 0;
}

private void iterateAck0() {
Expand Down Expand Up @@ -499,7 +523,7 @@ boolean iterateSend() {
}
}

return pendingSendTriggeredWakeups.get() > 0;
return !broker.getBrokerService().isStopping() && pendingSendTriggeredWakeups.get() > 0;
}

private void iterateSend0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setUp() throws Exception {
public void restoreSequenceWhenNoSequence() throws Exception {
sequencer.sequence = null;

sequencer.restoreSequence(null, Collections.emptyList());
sequencer.restoreSequence(intermediateQueue, null, Collections.emptyList());

assertThat(sequencer.sequence).isNull();
}
Expand All @@ -135,7 +135,7 @@ public void restoreSequenceWhenSequenceExistsButNoRecoverySequences() throws Exc
sequencer.sequence = null;

MessageId messageId = new MessageId("1:0:0:1");
sequencer.restoreSequence("1#" + messageId, Collections.emptyList());
sequencer.restoreSequence(intermediateQueue, "1#" + messageId, Collections.emptyList());
verify(replicationMessageProducer, never()).enqueueMainReplicaEvent(any(), any(ReplicaEvent.class));

assertThat(sequencer.sequence).isEqualTo(1);
Expand Down Expand Up @@ -167,7 +167,7 @@ public void restoreSequenceWhenStorageExistAndMessageDoesNotExist() throws Excep

when(intermediateSubscription.getDispatched()).thenReturn(new ArrayList<>(List.of(message1, message2, message3, message4)));

sequencer.restoreSequence("4#" + messageId4, List.of("1#" + messageId1 + "#" + messageId2, "3#" + messageId3 + "#" + messageId4));
sequencer.restoreSequence(intermediateQueue, "4#" + messageId4, List.of("1#" + messageId1 + "#" + messageId2, "3#" + messageId3 + "#" + messageId4));

assertThat(sequencer.sequence).isEqualTo(4);

Expand Down

0 comments on commit fd1e595

Please sign in to comment.