diff --git a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java index 5d18d3f..ab2eb50 100644 --- a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java +++ b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java @@ -4,6 +4,7 @@ import java.util.Map; import com.solace.messaging.PubSubPlusClientException; +import com.solace.messaging.config.SolaceConstants; import com.solace.messaging.receiver.InboundMessage; import com.solace.messaging.util.Converter; import com.solace.messaging.util.InteroperabilitySupport; @@ -130,7 +131,7 @@ public Map getProperties() { } public String getPartitionKey() { - return msg.getProperties().get("JMSXGroupID"); + return msg.getProperties().get(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY); } } diff --git a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java index 899c2ff..e006b50 100644 --- a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java @@ -63,7 +63,6 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); this.gracefulShutdown = ic.getClientGracefulShutdown(); this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout(); - DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build(); Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED }; if (ic.getConsumerQueueSupportsNacks()) { outcomes = new Outcome[] { Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED }; @@ -193,13 +192,13 @@ public Flow.Publisher> getStream() { public void waitForUnAcknowledgedMessages() { try { receiver.pause(); - SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged"); + SolaceLogging.log.infof("Waiting for incoming channel %s messages to be acknowledged", channel); if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { - SolaceLogging.log.info(String.format("Timed out while waiting for the" + - " remaining messages to be acknowledged.")); + SolaceLogging.log.infof("Timed out while waiting for the" + + " remaining messages to be acknowledged on channel %s.", channel); } } catch (InterruptedException e) { - SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged")); + SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel); throw new RuntimeException(e); } } diff --git a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java index 326f3e1..cca77d0 100644 --- a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import com.solace.messaging.config.SolaceConstants; import org.eclipse.microprofile.reactive.messaging.Message; import com.solace.messaging.MessagingService; @@ -147,7 +148,7 @@ private Uni publishMessage(PersistentMessagePublisher publisher, msgBuilder.withClassOfService(metadata.getClassOfService()); } if (metadata.getPartitionKey() != null) { - msgBuilder.withProperty(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, + msgBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, metadata.getPartitionKey()); } @@ -201,13 +202,13 @@ public Flow.Subscriber> getSubscriber() { public void waitForPublishedMessages() { try { - SolaceLogging.log.info("Waiting for outgoing messages to be published"); + SolaceLogging.log.infof("Waiting for outgoing channel %s messages to be published", channel); if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { - SolaceLogging.log.info(String.format("Timed out while waiting for the" + - " remaining messages to get publish acknowledgment.")); + SolaceLogging.log.infof("Timed out while waiting for the" + + " remaining messages to be acknowledged on channel %s.", channel); } } catch (InterruptedException e) { - SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged")); + SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel); throw new RuntimeException(e); } } diff --git a/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java index df94ff5..3e8197a 100644 --- a/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java @@ -4,10 +4,10 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.*; +import com.solace.messaging.config.SolaceConstants; import jakarta.enterprise.context.ApplicationScoped; import org.awaitility.Durations; @@ -202,39 +202,62 @@ void partitionedQueue() { .with("mp.messaging.incoming.consumer-2.connector", "quarkus-solace") .with("mp.messaging.incoming.consumer-2.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) - .with("mp.messaging.incoming.consumer-2.consumer.queue.type", "durable-non-exclusive"); + .with("mp.messaging.incoming.consumer-2.consumer.queue.type", "durable-non-exclusive") + .with("mp.messaging.incoming.consumer-3.connector", "quarkus-solace") + .with("mp.messaging.incoming.consumer-3.consumer.queue.name", + SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) + .with("mp.messaging.incoming.consumer-3.consumer.queue.type", "durable-non-exclusive") + .with("mp.messaging.incoming.consumer-4.connector", "quarkus-solace") + .with("mp.messaging.incoming.consumer-4.consumer.queue.name", + SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) + .with("mp.messaging.incoming.consumer-4.consumer.queue.type", "durable-non-exclusive"); // Run app that consumes messages MyPartitionedQueueConsumer app = runApplication(config, MyPartitionedQueueConsumer.class); - CopyOnWriteArrayList group1PartitionPayloads = new CopyOnWriteArrayList<>(); - CopyOnWriteArrayList group2PartitionPayloads = new CopyOnWriteArrayList<>(); + + CopyOnWriteArrayList partitionKeys = new CopyOnWriteArrayList<>(){ + { + add("Group-1"); + add("Group-2"); + add("Group-3"); + add("Group-4"); + } + }; + Map partitionMessages = new HashMap<>(){ + { + put(partitionKeys.get(0), 0); + put(partitionKeys.get(1), 0); + put(partitionKeys.get(2), 0); + put(partitionKeys.get(3), 0); + } + }; + + Random random = new Random(); // Produce messages PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() .build() .start(); Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION); for (int i = 0; i < 1000; i++) { - String partitionKey = "Group-1"; - if (i % 2 == 0) { - partitionKey = "Group-2"; - group2PartitionPayloads.add(Integer.toString(i)); - } else { - group1PartitionPayloads.add(Integer.toString(i)); - } - + int partitionIndex = random.nextInt(4); + String partitionKey = partitionKeys.get(partitionIndex); + int count = partitionMessages.get(partitionKey); + partitionMessages.put(partitionKey, (count + 1)); OutboundMessageBuilder messageBuilder = messagingService.messageBuilder(); - messageBuilder.withProperty(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, partitionKey); + messageBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, partitionKey); OutboundMessage outboundMessage = messageBuilder.build(Integer.toString(i)); publisher.publish(outboundMessage, tp); } - // Assert on published messages - await().untilAsserted(() -> assertThat(app.getReceivedMessagesOnPartitionConsumer1().size()) - .isEqualTo(group1PartitionPayloads.size())); - await().untilAsserted(() -> app.getReceivedMessagesOnPartitionConsumer1().containsAll(group1PartitionPayloads)); - await().untilAsserted(() -> assertThat(app.getReceivedMessagesOnPartitionConsumer2().size()) - .isEqualTo(group2PartitionPayloads.size())); - await().untilAsserted(() -> app.getReceivedMessagesOnPartitionConsumer2().containsAll(group2PartitionPayloads)); + // Assert on published and consumed messages + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(0))) + .isEqualTo(partitionMessages.get(partitionKeys.get(0)))); + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(1))) + .isEqualTo(partitionMessages.get(partitionKeys.get(1)))); + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(2))) + .isEqualTo(partitionMessages.get(partitionKeys.get(2)))); + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(3))) + .isEqualTo(partitionMessages.get(partitionKeys.get(3)))); } @Test @@ -403,32 +426,47 @@ public List getReceivedFailedMessages() { @ApplicationScoped static class MyPartitionedQueueConsumer { - private final List receivedMessagesOnPartitionConsumer1 = new CopyOnWriteArrayList<>(); - - private List receivedMessagesOnPartitionConsumer2 = new CopyOnWriteArrayList<>(); + Map partitionMessages = new HashMap<>(){ + { + put("Group-1", 0); + put("Group-2", 0); + put("Group-3", 0); + put("Group-4", 0); + } + }; @Incoming("consumer-1") CompletionStage consumer1(SolaceInboundMessage msg) { - receivedMessagesOnPartitionConsumer1.add(msg.getMessage().getPayloadAsString()); + updatePartitionMessages(msg); return msg.ack(); } @Incoming("consumer-2") CompletionStage consumer2(SolaceInboundMessage msg) { - receivedMessagesOnPartitionConsumer2.add(msg.getMessage().getPayloadAsString()); + updatePartitionMessages(msg); return msg.ack(); } - public List getReceivedMessagesOnPartitionConsumer1() { - return receivedMessagesOnPartitionConsumer1; + @Incoming("consumer-3") + CompletionStage consumer3(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + @Incoming("consumer-4") + CompletionStage consumer4(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); } - public List getReceivedMessagesOnPartitionConsumer2() { - return receivedMessagesOnPartitionConsumer2; + private void updatePartitionMessages(SolaceInboundMessage msg) { + String partitionKey = msg.getMessage().getProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY); + int count = partitionMessages.get(partitionKey); + partitionMessages.put(partitionKey, (count + 1)); } - public void setReceivedMessagesOnPartitionConsumer2(List receivedMessagesOnPartitionConsumer2) { - this.receivedMessagesOnPartitionConsumer2 = receivedMessagesOnPartitionConsumer2; + public Map getPartitionMessages() { + return partitionMessages; } } } diff --git a/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java b/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java index 10acd92..8beae7e 100644 --- a/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java +++ b/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java @@ -148,7 +148,7 @@ private Transferable createConfigurationScript() { updateConfigScript(scriptBuilder, "max-spool-usage 300"); updateConfigScript(scriptBuilder, "permission all consume"); updateConfigScript(scriptBuilder, "partition"); - updateConfigScript(scriptBuilder, "count 3"); + updateConfigScript(scriptBuilder, "count 4"); updateConfigScript(scriptBuilder, "exit"); updateConfigScript(scriptBuilder, "no shutdown"); updateConfigScript(scriptBuilder, "exit"); diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java index 4c04ffc..57e71ff 100644 --- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java @@ -1,6 +1,7 @@ package com.solace.quarkus.samples; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import jakarta.enterprise.context.ApplicationScoped; @@ -62,39 +63,4 @@ Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { return p.addMetadata(outboundMetadata); } - @Incoming("partition-consumer-1-in") - CompletionStage partitionConsumer1(SolaceInboundMessage message) { - Log.infof( - "Received message on partitionConsumer1 with partition key %s and payload %s", - message.getMessage().getProperties().get("JMSXGroupID"), message.getMessage().getPayloadAsString()); - - return message.ack(); - } - - @Incoming("partition-consumer-2-in") - CompletionStage partitionConsumer2(SolaceInboundMessage message) { - Log.infof( - "Received message on partitionConsumer2 with partition key %s and payload %s", - message.getMessage().getProperties().get("JMSXGroupID"), message.getMessage().getPayloadAsString()); - return message.ack(); - } - - @Outgoing("partition-publisher-out") - public Multi> partitionPublisher() { - - return Multi.createFrom().range(0, 1000).map(mapper -> { - String partitionKey = "2"; - if (mapper % 2 == 0) { - partitionKey = "1"; - } else if (mapper % 3 == 0) { - partitionKey = "3"; - } - SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() - .setPartitionKey(partitionKey) - .createPubSubOutboundMetadata(); - return Message.of("Hello World - " + mapper, Metadata.of(outboundMetadata), - () -> CompletableFuture.completedFuture(null)); - }); - } - }