Skip to content

Commit

Permalink
Merge pull request #6 from SolaceCoEExt/failure-strategies
Browse files Browse the repository at this point in the history
Failure strategies implementation
  • Loading branch information
SravanThotakura05 authored Dec 20, 2023
2 parents 2ce55eb + e41fb74 commit 0911668
Show file tree
Hide file tree
Showing 16 changed files with 336 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,37 +178,29 @@ Maximum wait time in milliseconds for consumers to receive a message from config
--|`integer`
| 100

a| [[quarkus-solace_quarkus.consumer.queue.discard-messages-on-failure]]`link:#quarkus-solace_quarkus.consumer.queue.discard-messages-on-failure[consumer.queue.discard-messages-on-failure]`
a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.queue.failure-strategy[consumer.queue.failure-strategy]`


[.description]
--
Whether discard messages from queue on failure. A negative acknowledgment of type REJECTED is sent to broker which discards the messages from queue and will move to DMQ if enabled. This option works only when enable-nacks is true and error topic is not configured.
Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`boolean`
| false
`ignore` - Mark the message as IGNORED, will continue processing with next message.

a| [[quarkus-solace_quarkus.consumer.queue.publish-to-error-topic-on-failure]]`link:#quarkus-solace_quarkus.consumer.queue.publish-to-error-topic-on-failure[consumer.queue.publish-to-error-topic-on-failure]`
`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.

`discard` - Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.

[.description]
--
Whether to publish consumed message to error topic on failure.
`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`boolean`
| false
--|`string`
| discard

a| [[quarkus-solace_quarkus.consumer.queue.error.topic]]`link:#quarkus-solace_quarkus.consumer.queue.error.topic[consumer.queue.error.topic]`

Expand Down Expand Up @@ -274,7 +266,7 @@ Maximum number of attempts to send a failed message to the error topic in case o
--|`int`
| `3`

a| [[quarkus-solace_quarkus.consumer.queue.enable-nacks]]`link:#quarkus-solace_quarkus.consumer.queue.enable-nacks[consumer.queue.enable-nacks]`
a| [[quarkus-solace_quarkus.consumer.queue.supports-nacks]]`link:#quarkus-solace_quarkus.consumer.queue.supports-nacks[consumer.queue.supports-nacks]`


[.description]
Expand All @@ -288,6 +280,6 @@ Whether to enable negative acknowledgments on failed messages. Nacks are support
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`boolean`
| `false`
| `true`

|===
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@
@ConnectorAttribute(name = "consumer.queue.polled-wait-time-in-millis", type = "int", direction = INCOMING, description = "Maximum wait time in milliseconds for consumers to receive a message from configured queue", defaultValue = "100")
// TODO implement consumer concurrency
//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1")
@ConnectorAttribute(name = "consumer.queue.discard-messages-on-failure", type = "boolean", direction = INCOMING, description = "Whether discard messages from queue on failure. A negative acknowledgment of type REJECTED is sent to broker which discards the messages from queue and will move to DMQ if enabled. This option works only when enable-nacks is true and error topic is not configured", defaultValue = "false")
@ConnectorAttribute(name = "consumer.queue.publish-to-error-topic-on-failure", type = "boolean", direction = INCOMING, description = "Whether to publish consumed message to error topic on failure", defaultValue = "false")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
@ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error")
@ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false")
@ConnectorAttribute(name = "consumer.queue.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.")
@ConnectorAttribute(name = "consumer.queue.error.message.max-delivery-attempts", type = "int", direction = INCOMING, description = "Maximum number of attempts to send a failed message to the error topic in case of failure. Each attempt will have a backoff interval of 1 second. When all delivery attempts have been exhausted, the failed message will be requeued on the queue for redelivery.", defaultValue = "3")
@ConnectorAttribute(name = "consumer.queue.enable-nacks", type = "boolean", direction = INCOMING, description = "Whether to enable negative acknowledgments on failed messages. Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an exception is thrown", defaultValue = "false")
@ConnectorAttribute(name = "consumer.queue.supports-nacks", type = "boolean", direction = INCOMING, description = "Whether to enable negative acknowledgments on failed messages. Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an exception is thrown", defaultValue = "false")

@ConnectorAttribute(name = "producer.topic", type = "string", direction = OUTGOING, description = "The topic to publish messages, by default the channel name")
@ConnectorAttribute(name = "producer.max-inflight-messages", type = "long", direction = OUTGOING, description = "The maximum number of messages to be written to Solace broker. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to `0` remove the limit", defaultValue = "1024")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkiverse.solace.incoming;
package io.quarkiverse.solace.fault;

import java.util.Properties;

Expand All @@ -7,21 +7,18 @@
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.receiver.InboundMessage;

import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration;

class OutboundErrorMessageMapper {

public OutboundMessage mapError(OutboundMessageBuilder messageBuilder, InboundMessage inputMessage,
SolaceConnectorIncomingConfiguration incomingConfiguration) {
boolean dmqEligible, Long timeToLive) {
Properties extendedMessageProperties = new Properties();

extendedMessageProperties.setProperty(SolaceProperties.MessageProperties.PERSISTENT_DMQ_ELIGIBLE,
Boolean.toString(incomingConfiguration.getConsumerQueueErrorMessageDmqEligible().booleanValue()));
Boolean.toString(dmqEligible));
messageBuilder.fromProperties(extendedMessageProperties);

incomingConfiguration.getConsumerQueueErrorMessageTtl().ifPresent(ttl -> {
messageBuilder.withTimeToLive(incomingConfiguration.getConsumerQueueErrorMessageTtl().get());
});
if (timeToLive != null) {
messageBuilder.withTimeToLive(timeToLive);
}

return messageBuilder.build(inputMessage.getPayloadAsBytes());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkiverse.solace.fault;

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.receiver.AcknowledgementSupport;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SettleMetadata;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Uni;

public class SolaceDiscard implements SolaceFailureHandler {
private final String channel;
private final AcknowledgementSupport ackSupport;

public SolaceDiscard(String channel, AcknowledgementSupport ackSupport) {
this.channel = channel;
this.ackSupport = ackSupport;
}

@Override
public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
MessageAcknowledgementConfiguration.Outcome outcome;
if (metadata != null) {
outcome = metadata.get(SettleMetadata.class)
.map(SettleMetadata::getOutcome)
.orElseGet(() -> MessageAcknowledgementConfiguration.Outcome.REJECTED /* TODO get outcome from reason */);
} else {
outcome = MessageAcknowledgementConfiguration.Outcome.REJECTED;
}

SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage());
return Uni.createFrom().voidItem()
.invoke(() -> ackSupport.settle(msg.getMessage(), outcome))
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.quarkiverse.solace.fault;

import java.time.Duration;
import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.AcknowledgementSupport;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Uni;

public class SolaceErrorTopic implements SolaceFailureHandler {
private final String channel;
private final AcknowledgementSupport ackSupport;
private final MessagingService solace;

private final SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler;
private long maxDeliveryAttempts;
private String errorTopic;
private boolean dmqEligible;
private Long timeToLive;

public SolaceErrorTopic(String channel, AcknowledgementSupport ackSupport, MessagingService solace) {
this.channel = channel;
this.ackSupport = ackSupport;
this.solace = solace;
this.solaceErrorTopicPublisherHandler = new SolaceErrorTopicPublisherHandler(solace);
}

public void setMaxDeliveryAttempts(long maxDeliveryAttempts) {
this.maxDeliveryAttempts = maxDeliveryAttempts;
}

public void setErrorTopic(String errorTopic) {
this.errorTopic = errorTopic;
}

public void setDmqEligible(boolean dmqEligible) {
this.dmqEligible = dmqEligible;
}

public void setTimeToLive(Long timeToLive) {
this.timeToLive = timeToLive;
}

@Override
public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler
.handle(msg, errorTopic, dmqEligible, timeToLive)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(maxDeliveryAttempts)
.subscribeAsCompletionStage().exceptionally((t) -> {
SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel,
t.getMessage());
return null;
}).join();

if (publishReceipt != null) {
return Uni.createFrom().voidItem()
.invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED))
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}

return Uni.createFrom().<Void> failure(reason)
.emitOn(msg::runOnMessageContext).subscribeAsCompletionStage();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkiverse.solace.incoming;
package io.quarkiverse.solace.fault;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
Expand All @@ -7,32 +7,33 @@
import com.solace.messaging.publisher.PersistentMessagePublisher.PublishReceipt;
import com.solace.messaging.resources.Topic;

import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration;
import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniEmitter;

class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener {

private final MessagingService solace;
private final String errorTopic;
private String errorTopic;
private final PersistentMessagePublisher publisher;
private final OutboundErrorMessageMapper outboundErrorMessageMapper;

public SolaceErrorTopicPublisherHandler(MessagingService solace, String errorTopic) {
public SolaceErrorTopicPublisherHandler(MessagingService solace) {
this.solace = solace;
this.errorTopic = errorTopic;

publisher = solace.createPersistentMessagePublisherBuilder().build();
publisher.start();
outboundErrorMessageMapper = new OutboundErrorMessageMapper();
}

public Uni<PublishReceipt> handle(SolaceInboundMessage<?> message,
SolaceConnectorIncomingConfiguration ic) {
String errorTopic,
boolean dmqEligible, Long timeToLive) {
this.errorTopic = errorTopic;
OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(),
message.getMessage(),
ic);
dmqEligible, timeToLive);
publisher.setMessagePublishReceiptListener(this);
// }
return Uni.createFrom().<PublishReceipt> emitter(e -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
package io.quarkiverse.solace.incoming;
package io.quarkiverse.solace.fault;

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.receiver.AcknowledgementSupport;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SettleMetadata;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Uni;

class SolaceFailureHandler {

public class SolaceFail implements SolaceFailureHandler {
private final String channel;
private final AcknowledgementSupport ackSupport;

private final MessagingService solace;

public SolaceFailureHandler(String channel, AcknowledgementSupport ackSupport, MessagingService solace) {
public SolaceFail(String channel, AcknowledgementSupport ackSupport) {
this.channel = channel;
this.ackSupport = ackSupport;
this.solace = solace;
}

public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata,
MessageAcknowledgementConfiguration.Outcome messageOutCome) {
@Override
public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
MessageAcknowledgementConfiguration.Outcome outcome;
if (metadata != null) {
outcome = metadata.get(SettleMetadata.class)
.map(SettleMetadata::getOutcome)
.orElseGet(() -> messageOutCome /* TODO get outcome from reason */);
.orElseGet(() -> MessageAcknowledgementConfiguration.Outcome.FAILED /* TODO get outcome from reason */);
} else {
outcome = messageOutCome != null ? messageOutCome
: MessageAcknowledgementConfiguration.Outcome.FAILED;
outcome = MessageAcknowledgementConfiguration.Outcome.FAILED;
}

SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkiverse.solace.fault;

import static io.quarkiverse.solace.i18n.SolaceExceptions.ex;

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.quarkiverse.solace.incoming.SolaceInboundMessage;

public interface SolaceFailureHandler {

enum Strategy {
/**
* Mark the message as IGNORED, will continue processing with next message.
*/
IGNORE,
/**
* Mark the message as FAILED, broker will redeliver the message.
*/
FAIL,
/**
* Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured
* for queue and DMQ Eligible is set on message.
*/
DISCARD,
/**
* Will publish the message to configured error topic, on success the message will be acknowledged in the queue.
*/
ERROR_TOPIC;

public static Strategy from(String s) {
if (s == null || s.equalsIgnoreCase("ignore")) {
return IGNORE;
}
if (s.equalsIgnoreCase("fail")) {
return FAIL;
}
if (s.equalsIgnoreCase("discard")) {
return DISCARD;
}
if (s.equalsIgnoreCase("error_topic")) {
return ERROR_TOPIC;
}

throw ex.illegalArgumentUnknownFailureStrategy(s);
}
}

CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata);
}
Loading

0 comments on commit 0911668

Please sign in to comment.