diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc index 4b786fd..498a628 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc @@ -178,28 +178,20 @@ Maximum wait time in milliseconds for consumers to receive a message from config --|`integer` | 100 -a| [[quarkus-solace_quarkus.consumer.queue.discard-messages-on-failure]]`link:#quarkus-solace_quarkus.consumer.queue.discard-messages-on-failure[consumer.queue.discard-messages-on-failure]` +a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.queue.failure-strategy[consumer.queue.failure-strategy]` [.description] -- -Whether discard messages from queue on failure. A negative acknowledgment of type REJECTED is sent to broker which discards the messages from queue and will move to DMQ if enabled. This option works only when enable-nacks is true and error topic is not configured. +Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`. -// ifdef::add-copy-button-to-env-var[] -// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[] -// endif::add-copy-button-to-env-var[] -// ifndef::add-copy-button-to-env-var[] -// Environment variable: `+++QUARKUS_SOLACE+++` -// endif::add-copy-button-to-env-var[] ---|`boolean` -| false +`ignore` - Mark the message as IGNORED, will continue processing with next message. -a| [[quarkus-solace_quarkus.consumer.queue.publish-to-error-topic-on-failure]]`link:#quarkus-solace_quarkus.consumer.queue.publish-to-error-topic-on-failure[consumer.queue.publish-to-error-topic-on-failure]` +`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version. +`discard` - Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version. -[.description] --- -Whether to publish consumed message to error topic on failure. +`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue. // ifdef::add-copy-button-to-env-var[] // Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[] @@ -207,8 +199,8 @@ Whether to publish consumed message to error topic on failure. // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] ---|`boolean` -| false +--|`string` +| discard a| [[quarkus-solace_quarkus.consumer.queue.error.topic]]`link:#quarkus-solace_quarkus.consumer.queue.error.topic[consumer.queue.error.topic]` @@ -274,7 +266,7 @@ Maximum number of attempts to send a failed message to the error topic in case o --|`int` | `3` -a| [[quarkus-solace_quarkus.consumer.queue.enable-nacks]]`link:#quarkus-solace_quarkus.consumer.queue.enable-nacks[consumer.queue.enable-nacks]` +a| [[quarkus-solace_quarkus.consumer.queue.supports-nacks]]`link:#quarkus-solace_quarkus.consumer.queue.supports-nacks[consumer.queue.supports-nacks]` [.description] @@ -288,6 +280,6 @@ Whether to enable negative acknowledgments on failed messages. Nacks are support // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] --|`boolean` -| `false` +| `true` |=== \ No newline at end of file diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java index a3daccd..131a5cf 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java @@ -53,13 +53,12 @@ @ConnectorAttribute(name = "consumer.queue.polled-wait-time-in-millis", type = "int", direction = INCOMING, description = "Maximum wait time in milliseconds for consumers to receive a message from configured queue", defaultValue = "100") // TODO implement consumer concurrency //@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1") -@ConnectorAttribute(name = "consumer.queue.discard-messages-on-failure", type = "boolean", direction = INCOMING, description = "Whether discard messages from queue on failure. A negative acknowledgment of type REJECTED is sent to broker which discards the messages from queue and will move to DMQ if enabled. This option works only when enable-nacks is true and error topic is not configured", defaultValue = "false") -@ConnectorAttribute(name = "consumer.queue.publish-to-error-topic-on-failure", type = "boolean", direction = INCOMING, description = "Whether to publish consumed message to error topic on failure", defaultValue = "false") +@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore") @ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error") @ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false") @ConnectorAttribute(name = "consumer.queue.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.") @ConnectorAttribute(name = "consumer.queue.error.message.max-delivery-attempts", type = "int", direction = INCOMING, description = "Maximum number of attempts to send a failed message to the error topic in case of failure. Each attempt will have a backoff interval of 1 second. When all delivery attempts have been exhausted, the failed message will be requeued on the queue for redelivery.", defaultValue = "3") -@ConnectorAttribute(name = "consumer.queue.enable-nacks", type = "boolean", direction = INCOMING, description = "Whether to enable negative acknowledgments on failed messages. Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an exception is thrown", defaultValue = "false") +@ConnectorAttribute(name = "consumer.queue.supports-nacks", type = "boolean", direction = INCOMING, description = "Whether to enable negative acknowledgments on failed messages. Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an exception is thrown", defaultValue = "false") @ConnectorAttribute(name = "producer.topic", type = "string", direction = OUTGOING, description = "The topic to publish messages, by default the channel name") @ConnectorAttribute(name = "producer.max-inflight-messages", type = "long", direction = OUTGOING, description = "The maximum number of messages to be written to Solace broker. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to `0` remove the limit", defaultValue = "1024") diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/OutboundErrorMessageMapper.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/OutboundErrorMessageMapper.java similarity index 58% rename from pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/OutboundErrorMessageMapper.java rename to pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/OutboundErrorMessageMapper.java index e18b0fb..d19a0d4 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/OutboundErrorMessageMapper.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/OutboundErrorMessageMapper.java @@ -1,4 +1,4 @@ -package io.quarkiverse.solace.incoming; +package io.quarkiverse.solace.fault; import java.util.Properties; @@ -7,21 +7,18 @@ import com.solace.messaging.publisher.OutboundMessageBuilder; import com.solace.messaging.receiver.InboundMessage; -import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration; - class OutboundErrorMessageMapper { public OutboundMessage mapError(OutboundMessageBuilder messageBuilder, InboundMessage inputMessage, - SolaceConnectorIncomingConfiguration incomingConfiguration) { + boolean dmqEligible, Long timeToLive) { Properties extendedMessageProperties = new Properties(); extendedMessageProperties.setProperty(SolaceProperties.MessageProperties.PERSISTENT_DMQ_ELIGIBLE, - Boolean.toString(incomingConfiguration.getConsumerQueueErrorMessageDmqEligible().booleanValue())); + Boolean.toString(dmqEligible)); messageBuilder.fromProperties(extendedMessageProperties); - - incomingConfiguration.getConsumerQueueErrorMessageTtl().ifPresent(ttl -> { - messageBuilder.withTimeToLive(incomingConfiguration.getConsumerQueueErrorMessageTtl().get()); - }); + if (timeToLive != null) { + messageBuilder.withTimeToLive(timeToLive); + } return messageBuilder.build(inputMessage.getPayloadAsBytes()); } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceDiscard.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceDiscard.java new file mode 100644 index 0000000..2d22f2d --- /dev/null +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceDiscard.java @@ -0,0 +1,41 @@ +package io.quarkiverse.solace.fault; + +import java.util.concurrent.CompletionStage; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import com.solace.messaging.config.MessageAcknowledgementConfiguration; +import com.solace.messaging.receiver.AcknowledgementSupport; + +import io.quarkiverse.solace.i18n.SolaceLogging; +import io.quarkiverse.solace.incoming.SettleMetadata; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; +import io.smallrye.mutiny.Uni; + +public class SolaceDiscard implements SolaceFailureHandler { + private final String channel; + private final AcknowledgementSupport ackSupport; + + public SolaceDiscard(String channel, AcknowledgementSupport ackSupport) { + this.channel = channel; + this.ackSupport = ackSupport; + } + + @Override + public CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata) { + MessageAcknowledgementConfiguration.Outcome outcome; + if (metadata != null) { + outcome = metadata.get(SettleMetadata.class) + .map(SettleMetadata::getOutcome) + .orElseGet(() -> MessageAcknowledgementConfiguration.Outcome.REJECTED /* TODO get outcome from reason */); + } else { + outcome = MessageAcknowledgementConfiguration.Outcome.REJECTED; + } + + SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage()); + return Uni.createFrom().voidItem() + .invoke(() -> ackSupport.settle(msg.getMessage(), outcome)) + .runSubscriptionOn(msg::runOnMessageContext) + .subscribeAsCompletionStage(); + } +} diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java new file mode 100644 index 0000000..11f2e7b --- /dev/null +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java @@ -0,0 +1,73 @@ +package io.quarkiverse.solace.fault; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import com.solace.messaging.MessagingService; +import com.solace.messaging.config.MessageAcknowledgementConfiguration; +import com.solace.messaging.publisher.PersistentMessagePublisher; +import com.solace.messaging.receiver.AcknowledgementSupport; + +import io.quarkiverse.solace.i18n.SolaceLogging; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; +import io.smallrye.mutiny.Uni; + +public class SolaceErrorTopic implements SolaceFailureHandler { + private final String channel; + private final AcknowledgementSupport ackSupport; + private final MessagingService solace; + + private final SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler; + private long maxDeliveryAttempts; + private String errorTopic; + private boolean dmqEligible; + private Long timeToLive; + + public SolaceErrorTopic(String channel, AcknowledgementSupport ackSupport, MessagingService solace) { + this.channel = channel; + this.ackSupport = ackSupport; + this.solace = solace; + this.solaceErrorTopicPublisherHandler = new SolaceErrorTopicPublisherHandler(solace); + } + + public void setMaxDeliveryAttempts(long maxDeliveryAttempts) { + this.maxDeliveryAttempts = maxDeliveryAttempts; + } + + public void setErrorTopic(String errorTopic) { + this.errorTopic = errorTopic; + } + + public void setDmqEligible(boolean dmqEligible) { + this.dmqEligible = dmqEligible; + } + + public void setTimeToLive(Long timeToLive) { + this.timeToLive = timeToLive; + } + + @Override + public CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata) { + PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler + .handle(msg, errorTopic, dmqEligible, timeToLive) + .onFailure().retry().withBackOff(Duration.ofSeconds(1)) + .atMost(maxDeliveryAttempts) + .subscribeAsCompletionStage().exceptionally((t) -> { + SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, + t.getMessage()); + return null; + }).join(); + + if (publishReceipt != null) { + return Uni.createFrom().voidItem() + .invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED)) + .runSubscriptionOn(msg::runOnMessageContext) + .subscribeAsCompletionStage(); + } + + return Uni.createFrom(). failure(reason) + .emitOn(msg::runOnMessageContext).subscribeAsCompletionStage(); + } +} diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceErrorTopicPublisherHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java similarity index 90% rename from pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceErrorTopicPublisherHandler.java rename to pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java index 6e160d8..cca26e1 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceErrorTopicPublisherHandler.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopicPublisherHandler.java @@ -1,4 +1,4 @@ -package io.quarkiverse.solace.incoming; +package io.quarkiverse.solace.fault; import com.solace.messaging.MessagingService; import com.solace.messaging.PubSubPlusClientException; @@ -7,21 +7,20 @@ import com.solace.messaging.publisher.PersistentMessagePublisher.PublishReceipt; import com.solace.messaging.resources.Topic; -import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration; import io.quarkiverse.solace.i18n.SolaceLogging; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.subscription.UniEmitter; class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener { private final MessagingService solace; - private final String errorTopic; + private String errorTopic; private final PersistentMessagePublisher publisher; private final OutboundErrorMessageMapper outboundErrorMessageMapper; - public SolaceErrorTopicPublisherHandler(MessagingService solace, String errorTopic) { + public SolaceErrorTopicPublisherHandler(MessagingService solace) { this.solace = solace; - this.errorTopic = errorTopic; publisher = solace.createPersistentMessagePublisherBuilder().build(); publisher.start(); @@ -29,10 +28,12 @@ public SolaceErrorTopicPublisherHandler(MessagingService solace, String errorTop } public Uni handle(SolaceInboundMessage message, - SolaceConnectorIncomingConfiguration ic) { + String errorTopic, + boolean dmqEligible, Long timeToLive) { + this.errorTopic = errorTopic; OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(), message.getMessage(), - ic); + dmqEligible, timeToLive); publisher.setMessagePublishReceiptListener(this); // } return Uni.createFrom(). emitter(e -> { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceFailureHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFail.java similarity index 64% rename from pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceFailureHandler.java rename to pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFail.java index 64d39b0..cd6888e 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceFailureHandler.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFail.java @@ -1,39 +1,35 @@ -package io.quarkiverse.solace.incoming; +package io.quarkiverse.solace.fault; import java.util.concurrent.CompletionStage; import org.eclipse.microprofile.reactive.messaging.Metadata; -import com.solace.messaging.MessagingService; import com.solace.messaging.config.MessageAcknowledgementConfiguration; import com.solace.messaging.receiver.AcknowledgementSupport; import io.quarkiverse.solace.i18n.SolaceLogging; +import io.quarkiverse.solace.incoming.SettleMetadata; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; import io.smallrye.mutiny.Uni; -class SolaceFailureHandler { - +public class SolaceFail implements SolaceFailureHandler { private final String channel; private final AcknowledgementSupport ackSupport; - private final MessagingService solace; - - public SolaceFailureHandler(String channel, AcknowledgementSupport ackSupport, MessagingService solace) { + public SolaceFail(String channel, AcknowledgementSupport ackSupport) { this.channel = channel; this.ackSupport = ackSupport; - this.solace = solace; } - public CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata, - MessageAcknowledgementConfiguration.Outcome messageOutCome) { + @Override + public CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata) { MessageAcknowledgementConfiguration.Outcome outcome; if (metadata != null) { outcome = metadata.get(SettleMetadata.class) .map(SettleMetadata::getOutcome) - .orElseGet(() -> messageOutCome /* TODO get outcome from reason */); + .orElseGet(() -> MessageAcknowledgementConfiguration.Outcome.FAILED /* TODO get outcome from reason */); } else { - outcome = messageOutCome != null ? messageOutCome - : MessageAcknowledgementConfiguration.Outcome.FAILED; + outcome = MessageAcknowledgementConfiguration.Outcome.FAILED; } SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage()); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFailureHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFailureHandler.java new file mode 100644 index 0000000..15eff4f --- /dev/null +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFailureHandler.java @@ -0,0 +1,51 @@ +package io.quarkiverse.solace.fault; + +import static io.quarkiverse.solace.i18n.SolaceExceptions.ex; + +import java.util.concurrent.CompletionStage; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.quarkiverse.solace.incoming.SolaceInboundMessage; + +public interface SolaceFailureHandler { + + enum Strategy { + /** + * Mark the message as IGNORED, will continue processing with next message. + */ + IGNORE, + /** + * Mark the message as FAILED, broker will redeliver the message. + */ + FAIL, + /** + * Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured + * for queue and DMQ Eligible is set on message. + */ + DISCARD, + /** + * Will publish the message to configured error topic, on success the message will be acknowledged in the queue. + */ + ERROR_TOPIC; + + public static Strategy from(String s) { + if (s == null || s.equalsIgnoreCase("ignore")) { + return IGNORE; + } + if (s.equalsIgnoreCase("fail")) { + return FAIL; + } + if (s.equalsIgnoreCase("discard")) { + return DISCARD; + } + if (s.equalsIgnoreCase("error_topic")) { + return ERROR_TOPIC; + } + + throw ex.illegalArgumentUnknownFailureStrategy(s); + } + } + + CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata); +} diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceIgnoreFailure.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceIgnoreFailure.java new file mode 100644 index 0000000..ffdcfb1 --- /dev/null +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceIgnoreFailure.java @@ -0,0 +1,24 @@ +package io.quarkiverse.solace.fault; + +import java.util.concurrent.CompletionStage; + +import org.eclipse.microprofile.reactive.messaging.Metadata; + +import io.quarkiverse.solace.i18n.SolaceLogging; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; +import io.smallrye.mutiny.Uni; + +public class SolaceIgnoreFailure implements SolaceFailureHandler { + + private final String channel; + + public SolaceIgnoreFailure(String channel) { + this.channel = channel; + } + + @Override + public CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata) { + SolaceLogging.log.messageSettled(channel, "ignored", reason.getMessage()); + return Uni.createFrom().voidItem().subscribeAsCompletionStage(); + } +} diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceExceptions.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceExceptions.java index 31f8cfb..3fe3b69 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceExceptions.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/i18n/SolaceExceptions.java @@ -21,4 +21,10 @@ public interface SolaceExceptions { @Message(id = 18002, value = "Expecting downstream to consume without back-pressure") IllegalStateException illegalStateConsumeWithoutBackPressure(); + @Message(id = 18003, value = "Invalid failure strategy: %s") + IllegalArgumentException illegalArgumentInvalidFailureStrategy(String strategy); + + @Message(id = 18004, value = "Unknown failure strategy: %s") + IllegalArgumentException illegalArgumentUnknownFailureStrategy(String strategy); + } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SettleMetadata.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SettleMetadata.java index 0238309..af6fa5d 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SettleMetadata.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SettleMetadata.java @@ -2,7 +2,7 @@ import com.solace.messaging.config.MessageAcknowledgementConfiguration; -class SettleMetadata { +public class SettleMetadata { MessageAcknowledgementConfiguration.Outcome settleOutcome; diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java index 17f32ec..2f2de78 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java @@ -2,19 +2,16 @@ import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; -import java.time.Duration; import java.util.concurrent.CompletionStage; import org.eclipse.microprofile.reactive.messaging.Metadata; -import com.solace.messaging.config.MessageAcknowledgementConfiguration; -import com.solace.messaging.publisher.PersistentMessagePublisher.PublishReceipt; import com.solace.messaging.receiver.InboundMessage; import io.netty.handler.codec.http.HttpHeaderValues; import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration; +import io.quarkiverse.solace.fault.SolaceFailureHandler; import io.quarkiverse.solace.i18n.SolaceLogging; -import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; import io.vertx.core.buffer.Buffer; @@ -24,21 +21,18 @@ public class SolaceInboundMessage implements ContextAwareMessage, Metadata private final InboundMessage msg; private final SolaceAckHandler ackHandler; private final SolaceFailureHandler nackHandler; - private final SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler; private final SolaceConnectorIncomingConfiguration ic; private final T payload; private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker; private Metadata metadata; public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler, - SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler, SolaceConnectorIncomingConfiguration ic, IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) { this.msg = message; this.unacknowledgedMessageTracker = unacknowledgedMessageTracker; this.payload = (T) convertPayload(); this.ackHandler = ackHandler; this.nackHandler = nackHandler; - this.solaceErrorTopicPublisherHandler = solaceErrorTopicPublisherHandler; this.ic = ic; this.metadata = captureContextMetadata(new SolaceInboundMetadata(message)); } @@ -96,45 +90,48 @@ public CompletionStage ack() { @Override public CompletionStage nack(Throwable reason, Metadata nackMetadata) { - if (solaceErrorTopicPublisherHandler == null) { - // REJECTED - Will move message to DMQ if enabled, FAILED - Will redeliver the message. - MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks() - ? (ic.getConsumerQueueDiscardMessagesOnFailure() ? MessageAcknowledgementConfiguration.Outcome.REJECTED - : MessageAcknowledgementConfiguration.Outcome.FAILED) - : null; // if nacks are not supported on broker, no outcome is required. - if (outcome != null) { - // decrement the tracker, as the message might get redelivered or moved to DMQ - this.unacknowledgedMessageTracker.decrement(); - return nackHandler.handle(this, reason, nackMetadata, outcome); - } - } else { - PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler.handle(this, ic) - .onFailure().retry().withBackOff(Duration.ofSeconds(1)) - .atMost(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts()) - .subscribeAsCompletionStage().exceptionally((t) -> { - SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel(), - t.getMessage()); - return null; - }).join(); - - if (publishReceipt != null) { - // decrement the tracker, as the message might get redelivered or moved to DMQ - this.unacknowledgedMessageTracker.decrement(); - return nackHandler.handle(this, reason, nackMetadata, MessageAcknowledgementConfiguration.Outcome.ACCEPTED); - } else { - if (ic.getConsumerQueueEnableNacks()) { - // decrement the tracker, as the message might get redelivered or moved to DMQ - this.unacknowledgedMessageTracker.decrement(); - return nackHandler.handle(this, reason, nackMetadata, - MessageAcknowledgementConfiguration.Outcome.FAILED); - } - } - } - - // decrement the tracker, as the message might get redelivered or moved to DMQ this.unacknowledgedMessageTracker.decrement(); - // return void stage if above check fail. This will not nack the message on broker. - return Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO - Restart receiver to redeliver message, needed when nacks are not supported on broker. + return nackHandler.handle(this, reason, nackMetadata); + + // if (solaceErrorTopicPublisherHandler == null) { + // // REJECTED - Will move message to DMQ if enabled, FAILED - Will redeliver the message. + // MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks() + // ? (ic.getConsumerQueueDiscardMessagesOnFailure() ? MessageAcknowledgementConfiguration.Outcome.REJECTED + // : MessageAcknowledgementConfiguration.Outcome.FAILED) + // : null; // if nacks are not supported on broker, no outcome is required. + // if (outcome != null) { + // // decrement the tracker, as the message might get redelivered or moved to DMQ + // this.unacknowledgedMessageTracker.decrement(); + // return nackHandler.handle(this, reason, nackMetadata, outcome); + // } + // } else { + // PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler.handle(this, ic) + // .onFailure().retry().withBackOff(Duration.ofSeconds(1)) + // .atMost(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts()) + // .subscribeAsCompletionStage().exceptionally((t) -> { + // SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel(), + // t.getMessage()); + // return null; + // }).join(); + // + // if (publishReceipt != null) { + // // decrement the tracker, as the message might get redelivered or moved to DMQ + // this.unacknowledgedMessageTracker.decrement(); + // return nackHandler.handle(this, reason, nackMetadata, MessageAcknowledgementConfiguration.Outcome.ACCEPTED); + // } else { + // if (ic.getConsumerQueueEnableNacks()) { + // // decrement the tracker, as the message might get redelivered or moved to DMQ + // this.unacknowledgedMessageTracker.decrement(); + // return nackHandler.handle(this, reason, nackMetadata, + // MessageAcknowledgementConfiguration.Outcome.FAILED); + // } + // } + // } + // + // // decrement the tracker, as the message might get redelivered or moved to DMQ + // this.unacknowledgedMessageTracker.decrement(); + // // return void stage if above check fail. This will not nack the message on broker. + // return Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO - Restart receiver to redeliver message, needed when nacks are not supported on broker. } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 0301670..c9e9edb 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -1,5 +1,7 @@ package io.quarkiverse.solace.incoming; +import static io.quarkiverse.solace.i18n.SolaceExceptions.ex; + import java.time.Duration; import java.time.ZonedDateTime; import java.util.Arrays; @@ -24,6 +26,7 @@ import com.solace.messaging.resources.TopicSubscription; import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration; +import io.quarkiverse.solace.fault.*; import io.quarkiverse.solace.i18n.SolaceLogging; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -42,7 +45,6 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final PersistentMessageReceiver receiver; private final Flow.Publisher> stream; private final ExecutorService pollerThread; - private SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler; private long waitTimeout = -1; // Assuming we won't ever exceed the limit of an unsigned long... @@ -54,7 +56,7 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i this.waitTimeout = ic.getClientShutdownWaitTimeout(); DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build(); Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED }; - if (ic.getConsumerQueueEnableNacks()) { + if (ic.getConsumerQueueSupportsNacks()) { outcomes = new Outcome[] { Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED }; } PersistentMessageReceiverBuilder builder = solace.createPersistentMessageReceiverBuilder() @@ -94,12 +96,7 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i this.receiver = builder.build(getQueue(ic)); boolean lazyStart = ic.getClientLazyStart(); this.ackHandler = new SolaceAckHandler(receiver); - this.failureHandler = new SolaceFailureHandler(channel, receiver, solace); - if (ic.getConsumerQueuePublishToErrorTopicOnFailure()) { - ic.getConsumerQueueErrorTopic().ifPresent(errorTopic -> { - this.solaceErrorTopicPublisherHandler = new SolaceErrorTopicPublisherHandler(solace, errorTopic); - }); - } + this.failureHandler = createFailureHandler(ic, solace); Integer timeout = getTimeout(ic.getConsumerQueuePolledWaitTimeInMillis()); // TODO Here use a subscription receiver.receiveAsync with an internal queue @@ -109,8 +106,8 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i .runSubscriptionOn(pollerThread)) .until(__ -> closed.get()) .emitOn(context::runOnContext) - .map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, - solaceErrorTopicPublisherHandler, ic, unacknowledgedMessageTracker)) + .map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, ic, + unacknowledgedMessageTracker)) .plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync())) : m) .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3); @@ -119,6 +116,32 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i } } + private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) { + String strategy = ic.getConsumerQueueFailureStrategy(); + SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy); + switch (actualStrategy) { + case IGNORE: + return new SolaceIgnoreFailure(ic.getChannel()); + case FAIL: + return new SolaceFail(ic.getChannel(), receiver); + case DISCARD: + return new SolaceDiscard(ic.getChannel(), receiver); + case ERROR_TOPIC: + SolaceErrorTopic solaceErrorTopic = new SolaceErrorTopic(ic.getChannel(), receiver, solace); + if (ic.getConsumerQueueErrorTopic().isEmpty()) { + throw ex.illegalArgumentInvalidFailureStrategy(strategy); + } + solaceErrorTopic.setErrorTopic(ic.getConsumerQueueErrorTopic().get()); + solaceErrorTopic.setDmqEligible(ic.getConsumerQueueErrorMessageDmqEligible().booleanValue()); + solaceErrorTopic.setTimeToLive(ic.getConsumerQueueErrorMessageTtl().orElse(null)); + solaceErrorTopic.setMaxDeliveryAttempts(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts()); + return solaceErrorTopic; + default: + throw ex.illegalArgumentInvalidFailureStrategy(strategy); + } + + } + private Integer getTimeout(Integer timeoutInMillis) { Integer realTimeout; final Long expiry = timeoutInMillis != null diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index afffee3..bdfc52d 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -116,7 +116,7 @@ void consumerFailedProcessingPublishToErrorTopic() { .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - .with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true) + .with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic") .with("mp.messaging.incoming.in.consumer.queue.error.topic", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION) .with("mp.messaging.incoming.in.consumer.queue.error.message.ttl", 1000) @@ -150,8 +150,8 @@ void consumerFailedProcessingMoveToDMQ() { .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - .with("mp.messaging.incoming.in.consumer.queue.enable-nacks", "true") - .with("mp.messaging.incoming.in.consumer.queue.discard-messages-on-failure", "true") + .with("mp.messaging.incoming.in.consumer.queue.supports-nacks", "true") + .with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "discard") .with("mp.messaging.incoming.dmq-in.connector", "quarkus-solace") .with("mp.messaging.incoming.dmq-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_DMQ_NAME) .with("mp.messaging.incoming.dmq-in.consumer.queue.type", "durable-exclusive"); @@ -207,7 +207,7 @@ void consumerPublishToErrorTopicPermissionException() { .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - .with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true) + .with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic") .with("mp.messaging.incoming.in.consumer.queue.error.topic", "publish/deny") .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") diff --git a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java index 0711a15..c312634 100644 --- a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java @@ -45,26 +45,26 @@ Message consumeAndPublish(SolaceInboundMessage p) { * * @param p */ - // @Incoming("dynamic-destination-in") - // @Outgoing("dynamic-destination-out") - // @Acknowledgment(Acknowledgment.Strategy.MANUAL) - // Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { - // Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); - // SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() - // .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message - // .createPubSubOutboundMetadata(); - // Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { - // CompletableFuture completableFuture = new CompletableFuture(); - // p.ack(); - // completableFuture.complete(null); - // return completableFuture; - // }, (throwable) -> { - // CompletableFuture completableFuture = new CompletableFuture(); - // p.nack(throwable, p.getMetadata()); - // completableFuture.complete(null); - // return completableFuture; - // }); - // return outboundMessage; - // } + @Incoming("dynamic-destination-in") + @Outgoing("dynamic-destination-out") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { + Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message + .createPubSubOutboundMetadata(); + Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { + CompletableFuture completableFuture = new CompletableFuture(); + p.ack(); + completableFuture.complete(null); + return completableFuture; + }, (throwable) -> { + CompletableFuture completableFuture = new CompletableFuture(); + p.nack(throwable, p.getMetadata()); + completableFuture.complete(null); + return completableFuture; + }); + return outboundMessage; + } } diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties index 7f1fcf2..72cebfd 100644 --- a/samples/hello-connector-solace/src/main/resources/application.properties +++ b/samples/hello-connector-solace/src/main/resources/application.properties @@ -5,24 +5,19 @@ quarkus.solace.authentication.basic.password= mp.messaging.outgoing.hello-out.connector=quarkus-solace mp.messaging.outgoing.hello-out.producer.topic= -#mp.messaging.outgoing.hello-out.producer.back-pressure.strategy=wait -#mp.messaging.outgoing.hello-out.producer.back-pressure.buffer-capacity=1 -#mp.messaging.outgoing.hello-out.producer.waitForPublishReceipt=false mp.messaging.incoming.hello-in.connector=quarkus-solace -mp.messaging.incoming.hello-in.consumer.queue.enable-nacks=true +mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true mp.messaging.incoming.hello-in.consumer.queue.name= mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive -mp.messaging.incoming.hello-in.consumer.queue.discard-messages-on-failure=false -mp.messaging.incoming.hello-in.consumer.queue.publish-to-error-topic-on-failure=true -mp.messaging.incoming.hello-in.consumer.queue.error.topic=solace/quarkus/error +#mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=error_topic +#mp.messaging.incoming.hello-in.consumer.queue.error.topic= mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace -mp.messaging.incoming.dynamic-destination-in.consumer.queue.enable-nacks=true +mp.messaging.incoming.dynamic-destination-in.consumer.queue.supports-nacks=true mp.messaging.incoming.dynamic-destination-in.consumer.queue.name= mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive -mp.messaging.incoming.dynamic-destination-in.consumer.queue.discard-messages-on-failure=false -mp.messaging.incoming.dynamic-destination-in.consumer.queue.publish-to-error-topic-on-failure=true +mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=ignore mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace