Skip to content

Commit

Permalink
Fixed Issues - #9 #10 #12
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jan 1, 2024
1 parent 76d9f4f commit fabf948
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 26 deletions.
13 changes: 13 additions & 0 deletions pubsub-plus-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@
</annotationProcessors>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<java.util.logging.manager>java.util.logging.LogManager</java.util.logging.manager>
</systemPropertyVariables>
<classpathDependencyExcludes>
<classpathDependencyExclude>org.jboss.slf4j:slf4j-jboss-logmanager</classpathDependencyExclude>
<classpathDependencyExclude>org.jboss.logmanager:jboss-logmanager-embedded</classpathDependencyExclude>
</classpathDependencyExcludes>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.AcknowledgementSupport;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Uni;

public class SolaceErrorTopic implements SolaceFailureHandler {
private final String channel;
Expand Down Expand Up @@ -50,24 +48,18 @@ public void setTimeToLive(Long timeToLive) {

@Override
public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler
.handle(msg, errorTopic, dmqEligible, timeToLive)
return solaceErrorTopicPublisherHandler.handle(msg, errorTopic, dmqEligible, timeToLive)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(maxDeliveryAttempts)
.subscribeAsCompletionStage().exceptionally((t) -> {
SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel,
t.getMessage());
return null;
}).join();

if (publishReceipt != null) {
return Uni.createFrom().voidItem()
.invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED))
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}

return Uni.createFrom().<Void> failure(reason)
.emitOn(msg::runOnMessageContext).subscribeAsCompletionStage();
.onItem().invoke(() -> {
SolaceLogging.log.messageSettled(channel,
MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(),
"Message is published to error topic and acknowledged on queue.");
ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
})
.replaceWithVoid()
.onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t.getMessage()))
.emitOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener {

private final MessagingService solace;
private String errorTopic;
private final PersistentMessagePublisher publisher;
private final OutboundErrorMessageMapper outboundErrorMessageMapper;

Expand All @@ -30,7 +29,6 @@ public SolaceErrorTopicPublisherHandler(MessagingService solace) {
public Uni<PublishReceipt> handle(SolaceInboundMessage<?> message,
String errorTopic,
boolean dmqEligible, Long timeToLive) {
this.errorTopic = errorTopic;
OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(),
message.getMessage(),
dmqEligible, timeToLive);
Expand All @@ -41,10 +39,9 @@ public Uni<PublishReceipt> handle(SolaceInboundMessage<?> message,
// always wait for error message publish receipt to ensure it is successfully spooled on broker.
publisher.publish(outboundMessage, Topic.of(errorTopic), e);
} catch (Throwable t) {
SolaceLogging.log.publishException(this.errorTopic);
e.fail(t);
}
}).invoke(() -> System.out.println(""));
}).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t.getMessage()));
}

@Override
Expand All @@ -53,7 +50,6 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
.getUserContext();
PubSubPlusClientException exception = publishReceipt.getException();
if (exception != null) {
SolaceLogging.log.publishException(this.errorTopic);
uniEmitter.fail(exception);
} else {
uniEmitter.complete(publishReceipt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public interface SolaceLogging extends BasicLogger {
void unsuccessfulToTopic(String topic, String channel, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55204, value = "A exception occurred when publishing to topic %s")
void publishException(String topic);
@Message(id = 55204, value = "A exception occurred when publishing to topic %s, reason: %s")
void publishException(String topic, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55205, value = "A exception occurred during shutdown %s")
Expand Down

0 comments on commit fabf948

Please sign in to comment.