Skip to content

Commit

Permalink
Updated tests for partitioned queue
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jan 30, 2024
1 parent d058ae3 commit 8fb2ed6
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;

import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.util.Converter;
import com.solace.messaging.util.InteroperabilitySupport;
Expand Down Expand Up @@ -130,7 +131,7 @@ public Map<String, String> getProperties() {
}

public String getPartitionKey() {
return msg.getProperties().get("JMSXGroupID");
return msg.getProperties().get(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.gracefulShutdown = ic.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout();
DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build();
Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED };
if (ic.getConsumerQueueSupportsNacks()) {
outcomes = new Outcome[] { Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED };
Expand Down Expand Up @@ -193,13 +192,13 @@ public Flow.Publisher<? extends Message<?>> getStream() {
public void waitForUnAcknowledgedMessages() {
try {
receiver.pause();
SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged");
SolaceLogging.log.infof("Waiting for incoming channel %s messages to be acknowledged", channel);
if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to be acknowledged."));
SolaceLogging.log.infof("Timed out while waiting for the" +
" remaining messages to be acknowledged on channel %s.", channel);
}
} catch (InterruptedException e) {
SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged"));
SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import com.solace.messaging.config.SolaceConstants;
import org.eclipse.microprofile.reactive.messaging.Message;

import com.solace.messaging.MessagingService;
Expand Down Expand Up @@ -147,7 +148,7 @@ private Uni<PublishReceipt> publishMessage(PersistentMessagePublisher publisher,
msgBuilder.withClassOfService(metadata.getClassOfService());
}
if (metadata.getPartitionKey() != null) {
msgBuilder.withProperty(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
msgBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
metadata.getPartitionKey());
}

Expand Down Expand Up @@ -201,13 +202,13 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber() {

public void waitForPublishedMessages() {
try {
SolaceLogging.log.info("Waiting for outgoing messages to be published");
SolaceLogging.log.infof("Waiting for outgoing channel %s messages to be published", channel);
if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to get publish acknowledgment."));
SolaceLogging.log.infof("Timed out while waiting for the" +
" remaining messages to be acknowledged on channel %s.", channel);
}
} catch (InterruptedException e) {
SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged"));
SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.*;

import com.solace.messaging.config.SolaceConstants;
import jakarta.enterprise.context.ApplicationScoped;

import org.awaitility.Durations;
Expand Down Expand Up @@ -202,39 +202,62 @@ void partitionedQueue() {
.with("mp.messaging.incoming.consumer-2.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-2.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-2.consumer.queue.type", "durable-non-exclusive");
.with("mp.messaging.incoming.consumer-2.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-3.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-3.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-3.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-4.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-4.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-4.consumer.queue.type", "durable-non-exclusive");

// Run app that consumes messages
MyPartitionedQueueConsumer app = runApplication(config, MyPartitionedQueueConsumer.class);
CopyOnWriteArrayList<String> group1PartitionPayloads = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<String> group2PartitionPayloads = new CopyOnWriteArrayList<>();

CopyOnWriteArrayList<String> partitionKeys = new CopyOnWriteArrayList<>(){
{
add("Group-1");
add("Group-2");
add("Group-3");
add("Group-4");
}
};
Map<String, Integer> partitionMessages = new HashMap<>(){
{
put(partitionKeys.get(0), 0);
put(partitionKeys.get(1), 0);
put(partitionKeys.get(2), 0);
put(partitionKeys.get(3), 0);
}
};

Random random = new Random();
// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION);
for (int i = 0; i < 1000; i++) {
String partitionKey = "Group-1";
if (i % 2 == 0) {
partitionKey = "Group-2";
group2PartitionPayloads.add(Integer.toString(i));
} else {
group1PartitionPayloads.add(Integer.toString(i));
}

int partitionIndex = random.nextInt(4);
String partitionKey = partitionKeys.get(partitionIndex);
int count = partitionMessages.get(partitionKey);
partitionMessages.put(partitionKey, (count + 1));
OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
messageBuilder.withProperty(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, partitionKey);
messageBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, partitionKey);
OutboundMessage outboundMessage = messageBuilder.build(Integer.toString(i));
publisher.publish(outboundMessage, tp);
}

// Assert on published messages
await().untilAsserted(() -> assertThat(app.getReceivedMessagesOnPartitionConsumer1().size())
.isEqualTo(group1PartitionPayloads.size()));
await().untilAsserted(() -> app.getReceivedMessagesOnPartitionConsumer1().containsAll(group1PartitionPayloads));
await().untilAsserted(() -> assertThat(app.getReceivedMessagesOnPartitionConsumer2().size())
.isEqualTo(group2PartitionPayloads.size()));
await().untilAsserted(() -> app.getReceivedMessagesOnPartitionConsumer2().containsAll(group2PartitionPayloads));
// Assert on published and consumed messages
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(0)))
.isEqualTo(partitionMessages.get(partitionKeys.get(0))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(1)))
.isEqualTo(partitionMessages.get(partitionKeys.get(1))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(2)))
.isEqualTo(partitionMessages.get(partitionKeys.get(2))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(3)))
.isEqualTo(partitionMessages.get(partitionKeys.get(3))));
}

@Test
Expand Down Expand Up @@ -403,32 +426,47 @@ public List<String> getReceivedFailedMessages() {

@ApplicationScoped
static class MyPartitionedQueueConsumer {
private final List<String> receivedMessagesOnPartitionConsumer1 = new CopyOnWriteArrayList<>();

private List<String> receivedMessagesOnPartitionConsumer2 = new CopyOnWriteArrayList<>();
Map<String, Integer> partitionMessages = new HashMap<>(){
{
put("Group-1", 0);
put("Group-2", 0);
put("Group-3", 0);
put("Group-4", 0);
}
};

@Incoming("consumer-1")
CompletionStage<Void> consumer1(SolaceInboundMessage<?> msg) {
receivedMessagesOnPartitionConsumer1.add(msg.getMessage().getPayloadAsString());
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-2")
CompletionStage<Void> consumer2(SolaceInboundMessage<?> msg) {
receivedMessagesOnPartitionConsumer2.add(msg.getMessage().getPayloadAsString());
updatePartitionMessages(msg);
return msg.ack();
}

public List<String> getReceivedMessagesOnPartitionConsumer1() {
return receivedMessagesOnPartitionConsumer1;
@Incoming("consumer-3")
CompletionStage<Void> consumer3(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-4")
CompletionStage<Void> consumer4(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

public List<String> getReceivedMessagesOnPartitionConsumer2() {
return receivedMessagesOnPartitionConsumer2;
private void updatePartitionMessages(SolaceInboundMessage<?> msg) {
String partitionKey = msg.getMessage().getProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY);
int count = partitionMessages.get(partitionKey);
partitionMessages.put(partitionKey, (count + 1));
}

public void setReceivedMessagesOnPartitionConsumer2(List<String> receivedMessagesOnPartitionConsumer2) {
this.receivedMessagesOnPartitionConsumer2 = receivedMessagesOnPartitionConsumer2;
public Map<String, Integer> getPartitionMessages() {
return partitionMessages;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "max-spool-usage 300");
updateConfigScript(scriptBuilder, "permission all consume");
updateConfigScript(scriptBuilder, "partition");
updateConfigScript(scriptBuilder, "count 3");
updateConfigScript(scriptBuilder, "count 4");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "no shutdown");
updateConfigScript(scriptBuilder, "exit");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.solace.quarkus.samples;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -62,39 +63,4 @@ Message<?> consumeAndPublishToDynamicTopic(SolaceInboundMessage<?> p) {
return p.addMetadata(outboundMetadata);
}

@Incoming("partition-consumer-1-in")
CompletionStage<Void> partitionConsumer1(SolaceInboundMessage<?> message) {
Log.infof(
"Received message on partitionConsumer1 with partition key %s and payload %s",
message.getMessage().getProperties().get("JMSXGroupID"), message.getMessage().getPayloadAsString());

return message.ack();
}

@Incoming("partition-consumer-2-in")
CompletionStage<Void> partitionConsumer2(SolaceInboundMessage<?> message) {
Log.infof(
"Received message on partitionConsumer2 with partition key %s and payload %s",
message.getMessage().getProperties().get("JMSXGroupID"), message.getMessage().getPayloadAsString());
return message.ack();
}

@Outgoing("partition-publisher-out")
public Multi<Message<String>> partitionPublisher() {

return Multi.createFrom().range(0, 1000).map(mapper -> {
String partitionKey = "2";
if (mapper % 2 == 0) {
partitionKey = "1";
} else if (mapper % 3 == 0) {
partitionKey = "3";
}
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setPartitionKey(partitionKey)
.createPubSubOutboundMetadata();
return Message.of("Hello World - " + mapper, Metadata.of(outboundMetadata),
() -> CompletableFuture.completedFuture(null));
});
}

}

0 comments on commit 8fb2ed6

Please sign in to comment.