diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc index 481c091..bac3aba 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc @@ -1,6 +1,6 @@ :summaryTableId: quarkus-solace-extension-common -Common configuration for Solace Quarkus Extension Incoming and Outgoing channels +Common configuration for Quarkus Solace Messaging Connector Incoming and Outgoing channels [.configuration-reference.searchable, cols="80,.^10,.^10"] |=== diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc index 084c5e1..50a6b4c 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc @@ -1,6 +1,6 @@ :summaryTableId: quarkus-solace-extension-incoming -Incoming configuration for Solace Quarkus Extension +Incoming configuration for Quarkus Solace Messaging Connector [.configuration-reference.searchable, cols="80,.^10,.^10"] |=== diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc index aa01412..73ccbed 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc @@ -1,6 +1,6 @@ :summaryTableId: quarkus-solace-extension-outgoing -Outgoing configuration for Solace Quarkus Extension +Outgoing configuration for Quarkus Solace Messaging Connector [.configuration-reference.searchable, cols="80,.^10,.^10"] |=== @@ -111,7 +111,7 @@ Supported strategies `reject`, `elastic`, `wait`. Refer to `https://docs.solace. // Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_SERVICE_NAME+++` // endif::add-copy-button-to-env-var[] --|string -|`wait` +|`elastic` a| [[quarkus-solace_quarkus.producer.back-pressure.buffer-capacity]]`link:#quarkus-solace_quarkus.producer.back-pressure.buffer-capacity[producer.back-pressure.buffer-capacity]` diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc index 0e8b029..fd22a75 100644 --- a/docs/modules/ROOT/pages/index.adoc +++ b/docs/modules/ROOT/pages/index.adoc @@ -2,10 +2,19 @@ include::./includes/attributes.adoc[] -TIP: Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh. +== Introduction +The https://solace.com/products/platform/[Solace PubSub+ Platform]'s https://solace.com/products/event-broker/software/[software event broker] efficiently streams event-driven information between applications, IoT devices and user interfaces running in the cloud, on-premises, and hybrid environments using open APIs and protocols like AMQP, JMS, MQTT, REST and WebSocket. It can be installed into a variety of public and private clouds, PaaS, and on-premises environments, and brokers in multiple locations can be linked together in an https://solace.com/what-is-an-event-mesh/[event mesh] to dynamically share events across the distributed enterprise. -== Installation +== Quarkus Extension for Solace + +Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh. + +Users have the choice to use the extension in two ways + +{empty}1. `com.solace.quarkus:quarkus-solace-client` + +This extension provides only Solace Java Messaging API and users need to have their own implementation and configuration to interact with Solace PubSub+ broker. If you want to use this extension, you need to add the `com.solace.quarkus:quarkus-solace-client` extension first to your build file. @@ -20,6 +29,23 @@ For instance, with Maven, add the following dependency to your POM file: ---- +{empty}2. `com.solace.quarkus:quarkus-solace-messaging-connector` + +This extension is based on reactive messaging framework and provides pre-defined configurations for incoming and outgoing channels. + +If you want to use this extension, you need to add the `com.solace.quarkus:quarkus-solace-messaging-connector` extension first to your build file. + +For instance, with Maven, add the following dependency to your POM file: + +[source,xml,subs=attributes+] +---- + + com.solace.quarkus + quarkus-solace-messaging-connector + {project-version} + +---- + [[extension-configuration-reference]] == Extension Configuration Reference @@ -38,4 +64,328 @@ include::includes/quarkus-solace-extension-outgoing.adoc[leveloffset=+1, opts=op [[extension-common-configuration-reference]] == Common Configuration Reference -include::includes/quarkus-solace-extension-common.adoc[leveloffset=+1, opts=optional] \ No newline at end of file +include::includes/quarkus-solace-extension-common.adoc[leveloffset=+1, opts=optional] + +[[configuring-quarkus-solace-messaging-connector]] +== Configuring Quarkus Solace Messaging Connector + +Reactive Messaging framework supports different messaging backends it employs a generic vocabulary: + +* Applications send and receive messages. A message wraps a payload and can be extended with some metadata. With the Solace connector, a message corresponds to Inbound or Outbound Message. + +* Messages transit on channels. Application components connect to channels to publish and consume messages. The Solace connector maps channels to Solace queues and topics. + +* Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Solace is named `quarkus-solace`. + +A minimal configuration for the Solace connector with an incoming channel looks like the following: + +The following lines of configuration assumes that a exclusive queue is already provisioned on the broker +[source,properties] +---- +quarkus.solace.host=tcp://localhost:55555 +quarkus.solace.vpn=default +quarkus.solace.authentication.basic.username=basic +quarkus.solace.authentication.basic.password=basic + +mp.messaging.incoming.temperatures.connector=quarkus-solace +mp.messaging.incoming.temperatures.consumer.queue.name=temperatures +---- + +The extension also supports provisioning queues and subscriptions on broker given that the user has role access to create queues with subscriptions. Configuration is as follows + +[source,properties] +---- +quarkus.solace.host=tcp://localhost:55555 +quarkus.solace.vpn=default +quarkus.solace.authentication.basic.username=basic +quarkus.solace.authentication.basic.password=basic + +mp.messaging.incoming.temperatures.connector=quarkus-solace +mp.messaging.incoming.temperatures.consumer.queue.missing-resource-creation-strategy=create-on-start +mp.messaging.incoming.temperatures.consumer.queue.add-additional-subscriptions=true +mp.messaging.incoming.temperatures.consumer.queue.subscriptions=hello/foobar +---- + +1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services. + +2. If `consumer.queue.name` property is not specified, channel name will be used as queue name. + +[[receiving-messages-from-solace]] +== Receiving messages from Solace + +Using the previous configuration, Quarkus application can receive message in several possible ways. + +__Direct Payload__ +[source,java] +---- +import org.eclipse.microprofile.reactive.messaging.Incoming; +import jakarta.enterprise.context.ApplicationScoped; +@ApplicationScoped +public class TemperaturesConsumer { + @Incoming("temperatures") + public void consume(byte[] temperature) { + // process. + } +} +---- + +__Message__ +[source,java] +---- +@ApplicationScoped +public class TemperaturesConsumer { + @Incoming("temperatures") + public CompletionStage consume(Message msg) { + // access record metadata + SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow(); + // process the message payload. + double temperature = Double.parseDouble(new String(p.getPayload())); + // Acknowledge the incoming message + return msg.ack(); + } +} +---- + +__SolaceInboundMessage__ This is a wrapper to incoming Inbound Message from Solace Messaging API + +[source,java] +---- +import com.solace.messaging.receiver.InboundMessage;@ApplicationScoped +public class TemperaturesConsumer { + @Incoming("temperatures") + public void consume(InboundMessage inboundMessage) { + // process the message payload. + String temperature = inboundMessage.getPayloadAsString(); + } +} +---- + +[[acknowledgement-handling]] +== Acknowledgment Handling + +By default, acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing. + +[source,java] +---- +@ApplicationScoped +public class TemperaturesConsumer { + @Incoming("temperatures") + public CompletionStage consume(Message msg) { + // access record metadata + SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow(); + // process the message payload. + double temperature = Double.parseDouble(new String(p.getPayload())); + // Acknowledge the incoming message + return msg.ack(); + } +} +---- + +[[failure-strategies]] +== Failure Strategies + +If a message is nacked, a failure strategy is applied. Refer to <><>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension. + +`ignore` - Mark the message as IGNORED, will continue processing with next message. It TTL and DMQ are configured on the queue message will be moved to DMQ once TTL is reached. If no DMQ is configured but TTL is set message will be lost. + +`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version. + +`discard` - Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version. + +`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue. + +[[sending-messages-to-solace]] +== Sending messages to Solace + +Outgoing channel configuration to publish messages to Solace. + +[source,properties] +---- +quarkus.solace.host=tcp://localhost:55555 +quarkus.solace.vpn=default +quarkus.solace.authentication.basic.username=basic +quarkus.solace.authentication.basic.password=basic + +mp.messaging.incoming.temperatures-out.connector=quarkus-solace +mp.messaging.incoming.temperatures-out.producer.topic=temperatures +---- + +1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services. + +2. If `producer.topic` property is not specified, channel name will be used as topic name. + +Using the previous configuration Quarkus application can publish messages as follows + +[source,java] +---- +import io.smallrye.mutiny.Multi; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import jakarta.enterprise.context.ApplicationScoped; +import java.time.Duration; +import java.util.Random; + +@ApplicationScoped +public class TemperaturesProducer { + + private final Random random = new Random(); + + @Outgoing("temperatures-out") + public Multi generate() { + // Emit 1000 records + return Multi.createFrom().range(0, 1000) + .map(x -> random.nextDouble()); + } + +} +---- + +You can also generate a `org.eclipse.microprofile.reactive.messaging.Message` with required metadata and publish to Solace. + +[source,java] +---- +@ApplicationScoped +public class TemperaturesProducer { + private final Random random = new Random(); + + @Outgoing("temperatures-out") + Multi> publishTemperatures() { + return Multi.createFrom().range(0, 1000) + .map(i -> { + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId(Integer.toString(i)).createPubSubOutboundMetadata(); + return Message.of(random.nextDouble(), Metadata.of(outboundMetadata)); + }); + } +} +---- + +*SolaceOutboundMetadata* allows to configure metadata for the message. It supports all the headers supported by Solace and custom user properties. In addition to this it also supports configuring dynamic topic which overrides the default topic in application configuration file. + +Generating `org.eclipse.microprofile.reactive.messaging.Message` with dynamic topic and publish to Solace. + +[source,java] +---- +@ApplicationScoped +public class TemperaturesProducer { + private final Random random = new Random(); + + @Outgoing("temperatures-out") + Multi> publishTemperatures() { + return Multi.createFrom().range(0, 1000) + .map(i -> { + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId(Integer.toString(i)) + .setDynamicDestination("device/" + Integer.toString(i) + "/temperature").createPubSubOutboundMetadata(); + return Message.of(random.nextDouble(), Metadata.of(outboundMetadata)); + }); + } +} +---- + +Generating `org.eclipse.microprofile.reactive.messaging.Message` with partition key and publish to Solace. + +[source,java] +---- +@ApplicationScoped +public class TemperaturesProducer { + private final Random random = new Random(); + + @Outgoing("temperatures-out") + Multi> publishTemperatures() { + return Multi.createFrom().range(0, 1000) + .map(i -> { + String partitionKey = "Group-1"; + if(i % 2 == 0) { + partitionKey = "Group-2"; + } + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId(Integer.toString(i)) + .setPartitionKey(partitionKey).createPubSubOutboundMetadata(); + return Message.of(random.nextDouble(), Metadata.of(outboundMetadata)); + }); + } +} +---- + +Sending messages with __@Emitter__ + +[source,java] +---- +@Path("/temperatures") +public class PublisherResource { + + @Channel("temperatures-out") + MutinyEmitter temperatureEmitter; + + @POST + @Path("/publish") + public Uni publish(Temperature temperature) { + return temperatureEmitter.send(temperature); + } +} +---- + +== Producer Back-Pressure strategies + +Quarkus Solace Messaging connector provides three different strategies to handle back-pressure when publishing messages + +{empty}1.Reject - Publisher will start rejecting messages once specified limit is reached + +{empty}2.Wait - Publisher is throttled when a specified limit is reached + +{empty}3.Elastic - Use an unlimited internal buffer (default) + +CAUTION: In the current version we don't recommend to use back-pressure strategy `Reject` as it is in evolving phase. + +Refer to <><> and <><> on how to configure back-pressure for producer. + +[[processing-messages]] +== Processing Messages + +Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic. A processor method can be simply implemented using both the *@Incoming* and *@Outgoing* annotations: + +[source,java] +---- +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class TemperaturesProcessor { + + @Incoming("temperatures-in") + @Outgoing("temperatures-out") + public double process(byte[] temperature) { + return (Double.parseDouble(new String(p.getPayload())) - 32) * 5 / 9; + } + +} +---- + +[[health-checks]] +== Health Checks + +Quarkus provides several health checks for Solace. These checks are used in combination with the *quarkus-smallrye-health* extension. + +=== Reactive Messaging Health Checks + +When using Reactive Messaging and the Quarkus Solace Messaging Connector, each configured channel (incoming or outgoing) provides startup, liveness and readiness checks. + +The startup check verifies that the communication with Solace Broker is established. + +The liveness check captures any unrecoverable failure happening during the communication with Solace. + +The readiness check verifies that the Quarkus Solace Messaging Connector is ready to consume/produce messages to the configured Solace queues/topics. + +[[dev-services]] +Dev Services + +Solace Dev Services for Quarkus will spin up latest version of Solace PubSub standard with label `solace` when running tests or in dev mode. Solace Dev Services are enabled by default and will check for any existing containers with same label to reuse. If none is present a new container is started. + +[[metrics]] +== Metrics + +Quarkus Solace Messaging Connector exposes different metrics provided by Solace Java Messaging API. The metrics are enabled by default and can be accessed at `http://localhost:8080/q/dev-ui/io.quarkus.quarkus-micrometer/prometheus` + diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java index f9386cf..6badc57 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java @@ -59,7 +59,7 @@ @ConnectorAttribute(name = "producer.waitForPublishReceipt", type = "boolean", direction = OUTGOING, description = "Whether the client waits to receive the publish receipt from Solace broker before acknowledging the message", defaultValue = "true") @ConnectorAttribute(name = "producer.delivery.ack.timeout", type = "int", direction = OUTGOING, description = "Timeout to receive the publish receipt from broker.") @ConnectorAttribute(name = "producer.delivery.ack.window.size", type = "int", direction = OUTGOING, description = "Publish Window will determine the maximum number of messages the application can send before the Solace API must receive an acknowledgment from the Solace.") -@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "wait") +@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "elastic") @ConnectorAttribute(name = "producer.back-pressure.buffer-capacity", type = "int", direction = OUTGOING, description = "Outgoing messages backpressure buffer capacity", defaultValue = "1024") public class SolaceConnector implements InboundConnector, OutboundConnector, HealthReporter { diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java index c39582a..ca01133 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java @@ -55,14 +55,14 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o this.channel = oc.getChannel(); PersistentMessagePublisherBuilder builder = solace.createPersistentMessagePublisherBuilder(); switch (oc.getProducerBackPressureStrategy()) { - case "elastic": - builder.onBackPressureElastic(); + case "wait": + builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); break; case "reject": builder.onBackPressureReject(oc.getProducerBackPressureBufferCapacity()); break; default: - builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); + builder.onBackPressureElastic(); break; } this.gracefulShutdown = oc.getClientGracefulShutdown(); diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java index b835438..03ddc10 100644 --- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java @@ -1,5 +1,7 @@ package com.solace.quarkus.samples; +import java.util.concurrent.CompletionStage; + import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.*; @@ -22,9 +24,11 @@ public class HelloConsumer { */ @Outgoing("hello-out") Multi> publishMessage() { - SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() - .setApplicationMessageId("1").createPubSubOutboundMetadata(); - return Multi.createFrom().items("1").map(m -> Message.of(m, Metadata.of(outboundMetadata))); + return Multi.createFrom().items("1", "2", "3", "4").map(m -> { + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId(m).createPubSubOutboundMetadata(); + return Message.of(m, Metadata.of(outboundMetadata)); + }); } /** @@ -37,6 +41,28 @@ void consumeMessage(InboundMessage p) { Log.infof("Received message: %s from topic: %s", p.getPayloadAsString(), p.getDestinationName()); } + /** + * Receives message from queue - queue.foobar + * + * @param p + */ + @Incoming("hello-plain-message-in") + void consumePayload(String p) { + Log.infof("Received message: %s", p); + } + + /** + * Receives message from queue - queue.foobar + * + * @param p + */ + @Incoming("hello-reactive-message-in") + CompletionStage consumeMessage(Message p) { + Log.infof("Received message: %s from topic: %s", p.getPayload(), + p.getMetadata(SolaceInboundMetadata.class).get().getDestinationName()); + return p.ack(); + } + /** * Receives message from queue - queue.dynamic.topic and overwrites the topic configured in outgoing channel * dynamic-destination-out diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java index 601ce63..69bbd3f 100644 --- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java +++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java @@ -15,7 +15,7 @@ @Path("/hello") public class PublisherResource { - @Channel("hello-out") + @Channel("publisher-out") MutinyEmitter foobar; /** diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties index c0d8f33..fa3c93a 100644 --- a/samples/hello-connector-solace/src/main/resources/application.properties +++ b/samples/hello-connector-solace/src/main/resources/application.properties @@ -3,18 +3,28 @@ #quarkus.solace.authentication.basic.username= #quarkus.solace.authentication.basic.password= +mp.messaging.outgoing.publisher-out.connector=quarkus-solace +mp.messaging.outgoing.publisher-out.producer.topic=hello/person + mp.messaging.outgoing.hello-out.connector=quarkus-solace mp.messaging.outgoing.hello-out.producer.topic=hello/foobar -mp.messaging.outgoing.hello-out.merge=true mp.messaging.incoming.hello-in.connector=quarkus-solace mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true mp.messaging.incoming.hello-in.consumer.queue.name=queue.foobar mp.messaging.incoming.hello-in.consumer.queue.missing-resource-creation-strategy=create-on-start -mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive +mp.messaging.incoming.hello-in.consumer.queue.type=durable-non-exclusive mp.messaging.incoming.hello-in.consumer.queue.add-additional-subscriptions=true mp.messaging.incoming.hello-in.consumer.queue.subscriptions=hello/foobar +mp.messaging.incoming.hello-plain-message-in.connector=quarkus-solace +mp.messaging.incoming.hello-plain-message-in.consumer.queue.supports-nacks=true +mp.messaging.incoming.hello-plain-message-in.consumer.queue.name=queue.foobar + +mp.messaging.incoming.hello-reactive-message-in.connector=quarkus-solace +mp.messaging.incoming.hello-reactive-message-in.consumer.queue.supports-nacks=true +mp.messaging.incoming.hello-reactive-message-in.consumer.queue.name=queue.foobar + mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace mp.messaging.incoming.dynamic-destination-in.consumer.queue.supports-nacks=true mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.dynamic.topic