From fabf948a0abfeb8586df0bbdc8f50a9b810a838b Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Mon, 1 Jan 2024 13:44:23 +0530
Subject: [PATCH] Fixed Issues - #9 #10 #12
---
pubsub-plus-connector/pom.xml | 13 ++++++++
.../solace/fault/SolaceErrorTopic.java | 30 +++++++------------
.../SolaceErrorTopicPublisherHandler.java | 6 +---
.../solace/i18n/SolaceLogging.java | 4 +--
4 files changed, 27 insertions(+), 26 deletions(-)
diff --git a/pubsub-plus-connector/pom.xml b/pubsub-plus-connector/pom.xml
index d6dc9c7..3d55cff 100644
--- a/pubsub-plus-connector/pom.xml
+++ b/pubsub-plus-connector/pom.xml
@@ -162,6 +162,19 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ java.util.logging.LogManager
+
+
+ org.jboss.slf4j:slf4j-jboss-logmanager
+ org.jboss.logmanager:jboss-logmanager-embedded
+
+
+
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java
index 11f2e7b..90077ea 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java
@@ -7,12 +7,10 @@
import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
-import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.AcknowledgementSupport;
import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
-import io.smallrye.mutiny.Uni;
public class SolaceErrorTopic implements SolaceFailureHandler {
private final String channel;
@@ -50,24 +48,18 @@ public void setTimeToLive(Long timeToLive) {
@Override
public CompletionStage handle(SolaceInboundMessage> msg, Throwable reason, Metadata metadata) {
- PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler
- .handle(msg, errorTopic, dmqEligible, timeToLive)
+ return solaceErrorTopicPublisherHandler.handle(msg, errorTopic, dmqEligible, timeToLive)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(maxDeliveryAttempts)
- .subscribeAsCompletionStage().exceptionally((t) -> {
- SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel,
- t.getMessage());
- return null;
- }).join();
-
- if (publishReceipt != null) {
- return Uni.createFrom().voidItem()
- .invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED))
- .runSubscriptionOn(msg::runOnMessageContext)
- .subscribeAsCompletionStage();
- }
-
- return Uni.createFrom(). failure(reason)
- .emitOn(msg::runOnMessageContext).subscribeAsCompletionStage();
+ .onItem().invoke(() -> {
+ SolaceLogging.log.messageSettled(channel,
+ MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(),
+ "Message is published to error topic and acknowledged on queue.");
+ ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
+ })
+ .replaceWithVoid()
+ .onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t.getMessage()))
+ .emitOn(msg::runOnMessageContext)
+ .subscribeAsCompletionStage();
}
}
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java
index cca26e1..570abaf 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java
@@ -15,7 +15,6 @@
class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener {
private final MessagingService solace;
- private String errorTopic;
private final PersistentMessagePublisher publisher;
private final OutboundErrorMessageMapper outboundErrorMessageMapper;
@@ -30,7 +29,6 @@ public SolaceErrorTopicPublisherHandler(MessagingService solace) {
public Uni handle(SolaceInboundMessage> message,
String errorTopic,
boolean dmqEligible, Long timeToLive) {
- this.errorTopic = errorTopic;
OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(),
message.getMessage(),
dmqEligible, timeToLive);
@@ -41,10 +39,9 @@ public Uni handle(SolaceInboundMessage> message,
// always wait for error message publish receipt to ensure it is successfully spooled on broker.
publisher.publish(outboundMessage, Topic.of(errorTopic), e);
} catch (Throwable t) {
- SolaceLogging.log.publishException(this.errorTopic);
e.fail(t);
}
- }).invoke(() -> System.out.println(""));
+ }).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t.getMessage()));
}
@Override
@@ -53,7 +50,6 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
.getUserContext();
PubSubPlusClientException exception = publishReceipt.getException();
if (exception != null) {
- SolaceLogging.log.publishException(this.errorTopic);
uniEmitter.fail(exception);
} else {
uniEmitter.complete(publishReceipt);
diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java
index e7490cd..564b782 100644
--- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java
+++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceLogging.java
@@ -34,8 +34,8 @@ public interface SolaceLogging extends BasicLogger {
void unsuccessfulToTopic(String topic, String channel, String reason);
@LogMessage(level = Logger.Level.ERROR)
- @Message(id = 55204, value = "A exception occurred when publishing to topic %s")
- void publishException(String topic);
+ @Message(id = 55204, value = "A exception occurred when publishing to topic %s, reason: %s")
+ void publishException(String topic, String reason);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55205, value = "A exception occurred during shutdown %s")