From fabf948a0abfeb8586df0bbdc8f50a9b810a838b Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 1 Jan 2024 13:44:23 +0530 Subject: [PATCH 1/5] Fixed Issues - #9 #10 #12 --- pubsub-plus-connector/pom.xml | 13 ++++++++ .../solace/fault/SolaceErrorTopic.java | 30 +++++++------------ .../SolaceErrorTopicPublisherHandler.java | 6 +--- .../solace/i18n/SolaceLogging.java | 4 +-- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/pubsub-plus-connector/pom.xml b/pubsub-plus-connector/pom.xml index d6dc9c7..3d55cff 100644 --- a/pubsub-plus-connector/pom.xml +++ b/pubsub-plus-connector/pom.xml @@ -162,6 +162,19 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + java.util.logging.LogManager + + + org.jboss.slf4j:slf4j-jboss-logmanager + org.jboss.logmanager:jboss-logmanager-embedded + + + diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java index 11f2e7b..90077ea 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java @@ -7,12 +7,10 @@ import com.solace.messaging.MessagingService; import com.solace.messaging.config.MessageAcknowledgementConfiguration; -import com.solace.messaging.publisher.PersistentMessagePublisher; import com.solace.messaging.receiver.AcknowledgementSupport; import io.quarkiverse.solace.i18n.SolaceLogging; import io.quarkiverse.solace.incoming.SolaceInboundMessage; -import io.smallrye.mutiny.Uni; public class SolaceErrorTopic implements SolaceFailureHandler { private final String channel; @@ -50,24 +48,18 @@ public void setTimeToLive(Long timeToLive) { @Override public CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata) { - PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler - .handle(msg, errorTopic, dmqEligible, timeToLive) + return solaceErrorTopicPublisherHandler.handle(msg, errorTopic, dmqEligible, timeToLive) .onFailure().retry().withBackOff(Duration.ofSeconds(1)) .atMost(maxDeliveryAttempts) - .subscribeAsCompletionStage().exceptionally((t) -> { - SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, - t.getMessage()); - return null; - }).join(); - - if (publishReceipt != null) { - return Uni.createFrom().voidItem() - .invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED)) - .runSubscriptionOn(msg::runOnMessageContext) - .subscribeAsCompletionStage(); - } - - return Uni.createFrom(). failure(reason) - .emitOn(msg::runOnMessageContext).subscribeAsCompletionStage(); + .onItem().invoke(() -> { + SolaceLogging.log.messageSettled(channel, + MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(), + "Message is published to error topic and acknowledged on queue."); + ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED); + }) + .replaceWithVoid() + .onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t.getMessage())) + .emitOn(msg::runOnMessageContext) + .subscribeAsCompletionStage(); } } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java index cca26e1..570abaf 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java @@ -15,7 +15,6 @@ class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener { private final MessagingService solace; - private String errorTopic; private final PersistentMessagePublisher publisher; private final OutboundErrorMessageMapper outboundErrorMessageMapper; @@ -30,7 +29,6 @@ public SolaceErrorTopicPublisherHandler(MessagingService solace) { public Uni handle(SolaceInboundMessage message, String errorTopic, boolean dmqEligible, Long timeToLive) { - this.errorTopic = errorTopic; OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(), message.getMessage(), dmqEligible, timeToLive); @@ -41,10 +39,9 @@ public Uni handle(SolaceInboundMessage message, // always wait for error message publish receipt to ensure it is successfully spooled on broker. publisher.publish(outboundMessage, Topic.of(errorTopic), e); } catch (Throwable t) { - SolaceLogging.log.publishException(this.errorTopic); e.fail(t); } - }).invoke(() -> System.out.println("")); + }).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t.getMessage())); } @Override @@ -53,7 +50,6 @@ public void onPublishReceipt(PublishReceipt publishReceipt) { .getUserContext(); PubSubPlusClientException exception = publishReceipt.getException(); if (exception != null) { - SolaceLogging.log.publishException(this.errorTopic); uniEmitter.fail(exception); } else { uniEmitter.complete(publishReceipt); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java index e7490cd..564b782 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java @@ -34,8 +34,8 @@ public interface SolaceLogging extends BasicLogger { void unsuccessfulToTopic(String topic, String channel, String reason); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 55204, value = "A exception occurred when publishing to topic %s") - void publishException(String topic); + @Message(id = 55204, value = "A exception occurred when publishing to topic %s, reason: %s") + void publishException(String topic, String reason); @LogMessage(level = Logger.Level.ERROR) @Message(id = 55205, value = "A exception occurred during shutdown %s") From 02f0bace7dee4b0d153f2efca6ae028d33dbe43e Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 1 Jan 2024 13:47:49 +0530 Subject: [PATCH 2/5] replaced apache commons logging with solace logging --- .../IncomingMessagesUnsignedCounterBarrier.java | 11 ++++------- .../OutgoingMessagesUnsignedCounterBarrier.java | 11 ++++------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java index 1336ac3..80c72d6 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java @@ -1,21 +1,18 @@ package io.quarkiverse.solace.incoming; +import io.quarkiverse.solace.i18n.SolaceLogging; + import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - class IncomingMessagesUnsignedCounterBarrier { private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1) private final Lock awaitLock = new ReentrantLock(); private final Condition isZero = awaitLock.newCondition(); - private static final Log logger = LogFactory.getLog(IncomingMessagesUnsignedCounterBarrier.class); - public IncomingMessagesUnsignedCounterBarrier(long initialValue) { counter = new AtomicLong(initialValue); } @@ -64,7 +61,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti awaitLock.lock(); try { if (timeout > 0) { - logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); + SolaceLogging.log.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); final long expiry = unit.toMillis(timeout) + System.currentTimeMillis(); while (isGreaterThanZero()) { long realTimeout = expiry - System.currentTimeMillis(); @@ -76,7 +73,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return true; } else if (timeout < 0) { while (isGreaterThanZero()) { - logger.info(String.format("Waiting for %s items", counter.get())); + SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); } return true; diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java index 6855941..ae86289 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java @@ -1,21 +1,18 @@ package io.quarkiverse.solace.outgoing; +import io.quarkiverse.solace.i18n.SolaceLogging; + import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - class OutgoingMessagesUnsignedCounterBarrier { private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1) private final Lock awaitLock = new ReentrantLock(); private final Condition isZero = awaitLock.newCondition(); - private static final Log logger = LogFactory.getLog(OutgoingMessagesUnsignedCounterBarrier.class); - public OutgoingMessagesUnsignedCounterBarrier(long initialValue) { counter = new AtomicLong(initialValue); } @@ -64,7 +61,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti awaitLock.lock(); try { if (timeout > 0) { - logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); + SolaceLogging.log.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); final long expiry = unit.toMillis(timeout) + System.currentTimeMillis(); while (isGreaterThanZero()) { long realTimeout = expiry - System.currentTimeMillis(); @@ -76,7 +73,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return true; } else if (timeout < 0) { while (isGreaterThanZero()) { - logger.info(String.format("Waiting for %s items", counter.get())); + SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); } return true; From 3b52eeabaab30f60e231313d0ebd17167c3f8f4d Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 1 Jan 2024 13:53:53 +0530 Subject: [PATCH 3/5] Code formatting --- .../incoming/IncomingMessagesUnsignedCounterBarrier.java | 7 ++++--- .../outgoing/OutgoingMessagesUnsignedCounterBarrier.java | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java index 80c72d6..9c8352b 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java @@ -1,13 +1,13 @@ package io.quarkiverse.solace.incoming; -import io.quarkiverse.solace.i18n.SolaceLogging; - import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.quarkiverse.solace.i18n.SolaceLogging; + class IncomingMessagesUnsignedCounterBarrier { private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1) private final Lock awaitLock = new ReentrantLock(); @@ -61,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti awaitLock.lock(); try { if (timeout > 0) { - SolaceLogging.log.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); + SolaceLogging.log + .info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); final long expiry = unit.toMillis(timeout) + System.currentTimeMillis(); while (isGreaterThanZero()) { long realTimeout = expiry - System.currentTimeMillis(); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java index ae86289..b38a030 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java @@ -1,13 +1,13 @@ package io.quarkiverse.solace.outgoing; -import io.quarkiverse.solace.i18n.SolaceLogging; - import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.quarkiverse.solace.i18n.SolaceLogging; + class OutgoingMessagesUnsignedCounterBarrier { private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1) private final Lock awaitLock = new ReentrantLock(); @@ -61,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti awaitLock.lock(); try { if (timeout > 0) { - SolaceLogging.log.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); + SolaceLogging.log + .info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit)); final long expiry = unit.toMillis(timeout) + System.currentTimeMillis(); while (isGreaterThanZero()) { long realTimeout = expiry - System.currentTimeMillis(); From 1c4b52d848da26a61adde0be6694ca4b56376a83 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Thu, 4 Jan 2024 14:15:53 +0530 Subject: [PATCH 4/5] addressed comments in PR #14 --- .../quarkiverse/solace/fault/SolaceErrorTopic.java | 2 +- .../fault/SolaceErrorTopicPublisherHandler.java | 4 ++-- .../io/quarkiverse/solace/i18n/SolaceLogging.java | 13 +++++-------- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java index 90077ea..ec1aaee 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java @@ -58,7 +58,7 @@ public CompletionStage handle(SolaceInboundMessage msg, Throwable reaso ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED); }) .replaceWithVoid() - .onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t.getMessage())) + .onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t)) .emitOn(msg::runOnMessageContext) .subscribeAsCompletionStage(); } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java index 570abaf..c6fc0fa 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java @@ -22,6 +22,7 @@ public SolaceErrorTopicPublisherHandler(MessagingService solace) { this.solace = solace; publisher = solace.createPersistentMessagePublisherBuilder().build(); + publisher.setMessagePublishReceiptListener(this); publisher.start(); outboundErrorMessageMapper = new OutboundErrorMessageMapper(); } @@ -32,7 +33,6 @@ public Uni handle(SolaceInboundMessage message, OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(), message.getMessage(), dmqEligible, timeToLive); - publisher.setMessagePublishReceiptListener(this); // } return Uni.createFrom(). emitter(e -> { try { @@ -41,7 +41,7 @@ public Uni handle(SolaceInboundMessage message, } catch (Throwable t) { e.fail(t); } - }).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t.getMessage())); + }).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t)); } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java index 564b782..843b5f6 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java @@ -2,10 +2,7 @@ import org.jboss.logging.BasicLogger; import org.jboss.logging.Logger; -import org.jboss.logging.annotations.LogMessage; -import org.jboss.logging.annotations.Message; -import org.jboss.logging.annotations.MessageLogger; -import org.jboss.logging.annotations.Once; +import org.jboss.logging.annotations.*; /** * Logging for Solace PubSub Connector @@ -30,12 +27,12 @@ public interface SolaceLogging extends BasicLogger { void messageSettled(String channel, String outcome, String reason); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful, reason: %s") - void unsuccessfulToTopic(String topic, String channel, String reason); + @Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful") + void unsuccessfulToTopic(String topic, String channel, @Cause Throwable cause); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 55204, value = "A exception occurred when publishing to topic %s, reason: %s") - void publishException(String topic, String reason); + @Message(id = 55204, value = "A exception occurred when publishing to topic %s") + void publishException(String topic, @Cause Throwable cause); @LogMessage(level = Logger.Level.ERROR) @Message(id = 55205, value = "A exception occurred during shutdown %s") From 94a7e3e21cc81fbea050c68ee4786362d539dd6b Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Thu, 4 Jan 2024 15:19:34 +0530 Subject: [PATCH 5/5] Updated test to assert on log when publish to error topic is failed --- .../solace/SolaceConsumerTest.java | 56 +++++++++++-------- .../solace/logging/SolaceTestAppender.java | 30 ++++++++++ 2 files changed, 64 insertions(+), 22 deletions(-) create mode 100644 pubsub-plus-connector/src/test/java/io/quarkiverse/solace/logging/SolaceTestAppender.java diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index bdfc52d..5f949a3 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -26,10 +26,17 @@ import io.quarkiverse.solace.base.SolaceContainer; import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.logging.SolaceTestAppender; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SolaceConsumerTest extends WeldTestBase { + private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("io.quarkiverse.solace"); + private SolaceTestAppender solaceTestAppender = new SolaceTestAppender(); + + private SolaceConsumerTest() { + rootLogger.addAppender(solaceTestAppender); + } @Test @Order(1) @@ -180,28 +187,6 @@ void consumerFailedProcessingMoveToDMQ() { @Test @Order(6) - void consumerCreateMissingResourceAddSubscriptionPermissionException() { - MapBasedConfig config = new MapBasedConfig() - .with("mp.messaging.incoming.in.connector", "quarkus-solace") - .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") - .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) - .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); - - Exception exception = assertThrows(Exception.class, () -> { - // Run app that consumes messages - MyConsumer app = runApplication(config, MyConsumer.class); - }); - - // Assert on published messages - await().untilAsserted(() -> assertThat(exception.getMessage()) - .contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '" - + SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic)); - } - - @Test - @Order(7) void consumerPublishToErrorTopicPermissionException() { MapBasedConfig config = new MapBasedConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") @@ -210,6 +195,7 @@ void consumerPublishToErrorTopicPermissionException() { .with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic") .with("mp.messaging.incoming.in.consumer.queue.error.topic", "publish/deny") + .with("mp.messaging.incoming.in.consumer.queue.error.message.max-delivery-attempts", 0) .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); @@ -226,6 +212,32 @@ void consumerPublishToErrorTopicPermissionException() { publisher.publish(outboundMessage, tp); await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0)); + // await().untilAsserted(() -> assertThat(inMemoryLogHandler.getRecords().stream().filter(record -> record.getMessage().contains("A exception occurred when publishing to topic")).count()).isEqualTo(4)); + await().untilAsserted(() -> assertThat(solaceTestAppender.getLog().stream() + .anyMatch(record -> record.getMessage().toString().contains("Publishing error message to topic"))) + .isEqualTo(true)); + } + + @Test + @Order(7) + void consumerCreateMissingResourceAddSubscriptionPermissionException() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) + .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + + Exception exception = assertThrows(Exception.class, () -> { + // Run app that consumes messages + MyConsumer app = runApplication(config, MyConsumer.class); + }); + + // Assert on published messages + await().untilAsserted(() -> assertThat(exception.getMessage()) + .contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '" + + SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic)); } @ApplicationScoped diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/logging/SolaceTestAppender.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/logging/SolaceTestAppender.java new file mode 100644 index 0000000..0558689 --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/logging/SolaceTestAppender.java @@ -0,0 +1,30 @@ +package io.quarkiverse.solace.logging; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.LoggingEvent; + +public class SolaceTestAppender extends AppenderSkeleton { + private List log = new ArrayList<>(); + + @Override + protected void append(LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + + } + + @Override + public boolean requiresLayout() { + return false; + } + + public List getLog() { + return new ArrayList(log); + } +}