diff --git a/pubsub-plus-connector/pom.xml b/pubsub-plus-connector/pom.xml
index d6dc9c7..3d55cff 100644
--- a/pubsub-plus-connector/pom.xml
+++ b/pubsub-plus-connector/pom.xml
@@ -162,6 +162,19 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ java.util.logging.LogManager
+
+
+ org.jboss.slf4j:slf4j-jboss-logmanager
+ org.jboss.logmanager:jboss-logmanager-embedded
+
+
+
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java
index 11f2e7b..ec1aaee 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java
@@ -7,12 +7,10 @@
import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
-import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.AcknowledgementSupport;
import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
-import io.smallrye.mutiny.Uni;
public class SolaceErrorTopic implements SolaceFailureHandler {
private final String channel;
@@ -50,24 +48,18 @@ public void setTimeToLive(Long timeToLive) {
@Override
public CompletionStage handle(SolaceInboundMessage> msg, Throwable reason, Metadata metadata) {
- PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler
- .handle(msg, errorTopic, dmqEligible, timeToLive)
+ return solaceErrorTopicPublisherHandler.handle(msg, errorTopic, dmqEligible, timeToLive)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(maxDeliveryAttempts)
- .subscribeAsCompletionStage().exceptionally((t) -> {
- SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel,
- t.getMessage());
- return null;
- }).join();
-
- if (publishReceipt != null) {
- return Uni.createFrom().voidItem()
- .invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED))
- .runSubscriptionOn(msg::runOnMessageContext)
- .subscribeAsCompletionStage();
- }
-
- return Uni.createFrom(). failure(reason)
- .emitOn(msg::runOnMessageContext).subscribeAsCompletionStage();
+ .onItem().invoke(() -> {
+ SolaceLogging.log.messageSettled(channel,
+ MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(),
+ "Message is published to error topic and acknowledged on queue.");
+ ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
+ })
+ .replaceWithVoid()
+ .onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t))
+ .emitOn(msg::runOnMessageContext)
+ .subscribeAsCompletionStage();
}
}
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java
index cca26e1..c6fc0fa 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java
@@ -15,7 +15,6 @@
class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener {
private final MessagingService solace;
- private String errorTopic;
private final PersistentMessagePublisher publisher;
private final OutboundErrorMessageMapper outboundErrorMessageMapper;
@@ -23,6 +22,7 @@ public SolaceErrorTopicPublisherHandler(MessagingService solace) {
this.solace = solace;
publisher = solace.createPersistentMessagePublisherBuilder().build();
+ publisher.setMessagePublishReceiptListener(this);
publisher.start();
outboundErrorMessageMapper = new OutboundErrorMessageMapper();
}
@@ -30,21 +30,18 @@ public SolaceErrorTopicPublisherHandler(MessagingService solace) {
public Uni handle(SolaceInboundMessage> message,
String errorTopic,
boolean dmqEligible, Long timeToLive) {
- this.errorTopic = errorTopic;
OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(),
message.getMessage(),
dmqEligible, timeToLive);
- publisher.setMessagePublishReceiptListener(this);
// }
return Uni.createFrom(). emitter(e -> {
try {
// always wait for error message publish receipt to ensure it is successfully spooled on broker.
publisher.publish(outboundMessage, Topic.of(errorTopic), e);
} catch (Throwable t) {
- SolaceLogging.log.publishException(this.errorTopic);
e.fail(t);
}
- }).invoke(() -> System.out.println(""));
+ }).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t));
}
@Override
@@ -53,7 +50,6 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
.getUserContext();
PubSubPlusClientException exception = publishReceipt.getException();
if (exception != null) {
- SolaceLogging.log.publishException(this.errorTopic);
uniEmitter.fail(exception);
} else {
uniEmitter.complete(publishReceipt);
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java
index e7490cd..843b5f6 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java
@@ -2,10 +2,7 @@
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
-import org.jboss.logging.annotations.LogMessage;
-import org.jboss.logging.annotations.Message;
-import org.jboss.logging.annotations.MessageLogger;
-import org.jboss.logging.annotations.Once;
+import org.jboss.logging.annotations.*;
/**
* Logging for Solace PubSub Connector
@@ -30,12 +27,12 @@ public interface SolaceLogging extends BasicLogger {
void messageSettled(String channel, String outcome, String reason);
@LogMessage(level = Logger.Level.ERROR)
- @Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful, reason: %s")
- void unsuccessfulToTopic(String topic, String channel, String reason);
+ @Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful")
+ void unsuccessfulToTopic(String topic, String channel, @Cause Throwable cause);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55204, value = "A exception occurred when publishing to topic %s")
- void publishException(String topic);
+ void publishException(String topic, @Cause Throwable cause);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55205, value = "A exception occurred during shutdown %s")
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java
index 1336ac3..9c8352b 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java
@@ -6,16 +6,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import io.quarkiverse.solace.i18n.SolaceLogging;
class IncomingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();
- private static final Log logger = LogFactory.getLog(IncomingMessagesUnsignedCounterBarrier.class);
-
public IncomingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}
@@ -64,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
awaitLock.lock();
try {
if (timeout > 0) {
- logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
+ SolaceLogging.log
+ .info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
final long expiry = unit.toMillis(timeout) + System.currentTimeMillis();
while (isGreaterThanZero()) {
long realTimeout = expiry - System.currentTimeMillis();
@@ -76,7 +74,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
- logger.info(String.format("Waiting for %s items", counter.get()));
+ SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
}
return true;
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java
index 6855941..b38a030 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java
@@ -6,16 +6,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import io.quarkiverse.solace.i18n.SolaceLogging;
class OutgoingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();
- private static final Log logger = LogFactory.getLog(OutgoingMessagesUnsignedCounterBarrier.class);
-
public OutgoingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}
@@ -64,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
awaitLock.lock();
try {
if (timeout > 0) {
- logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
+ SolaceLogging.log
+ .info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
final long expiry = unit.toMillis(timeout) + System.currentTimeMillis();
while (isGreaterThanZero()) {
long realTimeout = expiry - System.currentTimeMillis();
@@ -76,7 +74,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
- logger.info(String.format("Waiting for %s items", counter.get()));
+ SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
}
return true;
diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java
index bdfc52d..5f949a3 100644
--- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java
+++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java
@@ -26,10 +26,17 @@
import io.quarkiverse.solace.base.SolaceContainer;
import io.quarkiverse.solace.base.WeldTestBase;
+import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
+ private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("io.quarkiverse.solace");
+ private SolaceTestAppender solaceTestAppender = new SolaceTestAppender();
+
+ private SolaceConsumerTest() {
+ rootLogger.addAppender(solaceTestAppender);
+ }
@Test
@Order(1)
@@ -180,28 +187,6 @@ void consumerFailedProcessingMoveToDMQ() {
@Test
@Order(6)
- void consumerCreateMissingResourceAddSubscriptionPermissionException() {
- MapBasedConfig config = new MapBasedConfig()
- .with("mp.messaging.incoming.in.connector", "quarkus-solace")
- .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
- .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
- .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
- .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
- .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);
-
- Exception exception = assertThrows(Exception.class, () -> {
- // Run app that consumes messages
- MyConsumer app = runApplication(config, MyConsumer.class);
- });
-
- // Assert on published messages
- await().untilAsserted(() -> assertThat(exception.getMessage())
- .contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '"
- + SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic));
- }
-
- @Test
- @Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
@@ -210,6 +195,7 @@ void consumerPublishToErrorTopicPermissionException() {
.with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic")
.with("mp.messaging.incoming.in.consumer.queue.error.topic",
"publish/deny")
+ .with("mp.messaging.incoming.in.consumer.queue.error.message.max-delivery-attempts", 0)
.with("mp.messaging.incoming.error-in.connector", "quarkus-solace")
.with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME)
.with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive");
@@ -226,6 +212,32 @@ void consumerPublishToErrorTopicPermissionException() {
publisher.publish(outboundMessage, tp);
await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0));
+ // await().untilAsserted(() -> assertThat(inMemoryLogHandler.getRecords().stream().filter(record -> record.getMessage().contains("A exception occurred when publishing to topic")).count()).isEqualTo(4));
+ await().untilAsserted(() -> assertThat(solaceTestAppender.getLog().stream()
+ .anyMatch(record -> record.getMessage().toString().contains("Publishing error message to topic")))
+ .isEqualTo(true));
+ }
+
+ @Test
+ @Order(7)
+ void consumerCreateMissingResourceAddSubscriptionPermissionException() {
+ MapBasedConfig config = new MapBasedConfig()
+ .with("mp.messaging.incoming.in.connector", "quarkus-solace")
+ .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
+ .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
+ .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
+ .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
+ .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);
+
+ Exception exception = assertThrows(Exception.class, () -> {
+ // Run app that consumes messages
+ MyConsumer app = runApplication(config, MyConsumer.class);
+ });
+
+ // Assert on published messages
+ await().untilAsserted(() -> assertThat(exception.getMessage())
+ .contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '"
+ + SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic));
}
@ApplicationScoped
diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/logging/SolaceTestAppender.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/logging/SolaceTestAppender.java
new file mode 100644
index 0000000..0558689
--- /dev/null
+++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/logging/SolaceTestAppender.java
@@ -0,0 +1,30 @@
+package io.quarkiverse.solace.logging;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class SolaceTestAppender extends AppenderSkeleton {
+ private List log = new ArrayList<>();
+
+ @Override
+ protected void append(LoggingEvent loggingEvent) {
+ log.add(loggingEvent);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ public List getLog() {
+ return new ArrayList(log);
+ }
+}