Skip to content

Commit

Permalink
Solace Direct Messaging Capability
Browse files Browse the repository at this point in the history
Removed Unused config

fix failing test

Fix failing tests

Review comments and Documentation update

Fixed failing test and updated documentation

Direct Messaging: Publish failed listener to track failures and update health status

Updated property names which are common to direct and persistent messaging

Solace Direct Messaging capability
  • Loading branch information
SravanThotakura05 committed Jul 18, 2024
1 parent 6e4c350 commit 4484bac
Show file tree
Hide file tree
Showing 29 changed files with 1,838 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ h|[[quarkus-solace_configuration_common]]link:#quarkus-solace_configuration_comm
h|Type
h|Default

a| [[quarkus-solace_quarkus.client.type]]`link:#quarkus-solace_quarkus.client.type[client.type]`


[.description]
--
The type of client when establishing connection to Solace. Solace supports two types of client Direct and Persistent.

Use Direct client where message loss can be tolerated. The publisher publishes the event and the broker doesn't send any acknowledgement back to publisher for guaranteed delivery.

Use Persistent client where message loss cannot be tolerated. The publisher publishes the event and the broker sends an acknowledgement that message is guaranteed for delivery.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++`
// endif::add-copy-button-to-env-var[]
--|string
| `persistent`

a| [[quarkus-solace_quarkus.client.lazy.start]]`link:#quarkus-solace_quarkus.client.lazy.start[client.lazy.start]`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,43 @@ h|[[quarkus-solace_configuration_incoming]]link:#quarkus-solace_configuration_in
h|Type
h|Default

a| [[quarkus-solace_quarkus.client.type.direct.back-pressure.strategy]]`link:#quarkus-solace_quarkus.client.type.direct.back-pressure.strategy[client.type.direct.back-pressure.strategy]`


[.description]
--
The back-pressure strategy to be applied for direct message consumer.

Supported values are `oldest`, `latest` & `elastic`.

Refer to https://docs.solace.com/API/API-Developer-Guide-Java/Java-DM-Subscribe.htm#Configuring-Back-Pressure[Handling Back-Pressure When Subscribing to Direct Messages]

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++`
// endif::add-copy-button-to-env-var[]
--|string
| `elastic`

a| [[quarkus-solace_quarkus.client.type.direct.back-pressure.buffer-capacity]]`link:#quarkus-solace_quarkus.client.type.direct.back-pressure.buffer-capacity[client.type.direct.back-pressure.buffer-capacity]`


[.description]
--

It is possible for the client application to consume messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++`
// endif::add-copy-button-to-env-var[]
--|int
| `1024`

a| [[quarkus-solace_quarkus.consumer.queue.name]]`link:#quarkus-solace_quarkus.consumer.queue.name[consumer.queue.name]`


Expand Down Expand Up @@ -77,12 +114,16 @@ Whether to add configured subscriptions to queue. Will fail if permissions to co
|`false`


a| [[quarkus-solace_quarkus.consumer.queue.subscriptions]]`link:#quarkus-solace_quarkus.consumer.queue.subscriptions[consumer.queue.subscriptions]`
a| [[quarkus-solace_quarkus.consumer.subscriptions]]`link:#quarkus-solace_quarkus.consumer.subscriptions[consumer.subscriptions]`


[.description]
--
The comma separated list of subscriptions, the channel name if empty. This configuration is considered if `consumer.queue.add-additional-subscriptions` is set to true.
The comma separated list of subscriptions, the channel name if empty.

If `client.type` is `persistent` this configuration is considered only if `consumer.queue.add-additional-subscriptions` is set to true.

If `client.type` is `direct` this is used by default.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_DEVSERVICES_SHARED+++[]
Expand All @@ -91,7 +132,7 @@ The comma separated list of subscriptions, the channel name if empty. This confi
// Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_SHARED+++`
// endif::add-copy-button-to-env-var[]
--|string
|
| required icon:exclamation-circle[title=Configuration property is required]


a| [[quarkus-solace_quarkus.consumer.queue.selector-query]]`link:#quarkus-solace_quarkus.consumer.queue.selector-query[quarkus.consumer.queue.selector-query]`
Expand Down Expand Up @@ -158,16 +199,18 @@ The receiver replay replication group message id.
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_CONTAINER_ENV+++`
// endif::add-copy-button-to-env-var[]
--|`string`
--|string
|

a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.queue.failure-strategy[consumer.queue.failure-strategy]`
a| [[quarkus-solace_quarkus.consumer.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.failure-strategy[consumer.failure-strategy]`


[.description]
--
Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.

Following are the failure strategies supported when `client.type` is `persistent`.

`ignore` - Mark the message as IGNORED, will continue processing with next message.

`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.
Expand All @@ -176,16 +219,22 @@ Specify the failure strategy to apply when a message consumed from Solace broker

`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue.

Following are the failure strategies supported when `client.type` is `direct`.

`ignore` - Mark the message as IGNORED, will continue processing with next message.

`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`string`
--|string
| ignore

a| [[quarkus-solace_quarkus.consumer.queue.error.topic]]`link:#quarkus-solace_quarkus.consumer.queue.error.topic[consumer.queue.error.topic]`
a| [[quarkus-solace_quarkus.consumer.error.topic]]`link:#quarkus-solace_quarkus.consumer.error.topic[consumer.error.topic]`


[.description]
Expand All @@ -198,10 +247,10 @@ The error topic where message should be published in case of error.
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`string`
--|string
|

a| [[quarkus-solace_quarkus.consumer.queue.error.message.dmq-eligible]]`link:#quarkus-solace_quarkus.consumer.queue.error.message.dmq-eligible[consumer.queue.error.message.dmq-eligible]`
a| [[quarkus-solace_quarkus.consumer.error.message.dmq-eligible]]`link:#quarkus-solace_quarkus.consumer.error.message.dmq-eligible[consumer.error.message.dmq-eligible]`


[.description]
Expand All @@ -214,10 +263,10 @@ Whether error message is eligible to move to dead message queue.
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`boolean`
--|boolean
| `false`

a| [[quarkus-solace_quarkus.consumer.queue.error.message.ttl]]`link:#quarkus-solace_quarkus.consumer.queue.error.message.ttl[consumer.queue.error.message.ttl]`
a| [[quarkus-solace_quarkus.consumer.error.message.ttl]]`link:#quarkus-solace_quarkus.consumer.error.message.ttl[consumer.error.message.ttl]`


[.description]
Expand All @@ -230,10 +279,10 @@ TTL for Error message before moving to dead message queue.
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`long`
--|long
| `null`

a| [[quarkus-solace_quarkus.consumer.queue.error.message.max-delivery-attempts]]`link:#quarkus-solace_quarkus.consumer.queue.error.message.max-delivery-attempts[consumer.queue.error.message.max-delivery-attempts]`
a| [[quarkus-solace_quarkus.consumer.error.message.max-delivery-attempts]]`link:#quarkus-solace_quarkus.consumer.error.message.max-delivery-attempts[consumer.error.message.max-delivery-attempts]`


[.description]
Expand All @@ -246,7 +295,7 @@ Maximum number of attempts to send a failed message to the error topic in case o
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`int`
--|int
| `3`

a| [[quarkus-solace_quarkus.consumer.queue.supports-nacks]]`link:#quarkus-solace_quarkus.consumer.queue.supports-nacks[consumer.queue.supports-nacks]`
Expand All @@ -262,7 +311,7 @@ Whether to enable negative acknowledgments on failed messages. Nacks are support
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`boolean`
--|boolean
| `false`

|===
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,24 @@ h|[[quarkus-solace_configuration_outgoing]]link:#quarkus-solace_configuration_ou
h|Type
h|Default

a| [[quarkus-solace_quarkus.client.type.direct.waitForPublishReceipt.timeout]]`link:#quarkus-solace_quarkus.client.type.direct.waitForPublishReceipt.timeout[client.type.direct.waitForPublishReceipt.timeout]`


[.description]
--
In case of direct messaging broker will not send any acknowledgement for published messages.

We can configure a timeout in milliseconds to check for any publish failures. If no failed event is received during this timeout published message is assumed to be successful.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++`
// endif::add-copy-button-to-env-var[]
--|long
| `30`

a| [[quarkus-solace_quarkus.producer.topic]]`link:#quarkus-solace_quarkus.producer.topic[producer.topic]`


Expand Down Expand Up @@ -48,7 +66,9 @@ a| [[quarkus-solace_quarkus.producer.waitForPublishReceipt]]`link:#quarkus-solac

[.description]
--
Whether the client waits to receive the publish receipt from Solace broker before acknowledging the message.
Whether the client waits to receive publish receipt from Solace broker before sending acknowledgment. This property is considered only when `client.type` is `persistent`.

In case of `client.type` is `direct` publish receipt is not sent by broker and extension returns success acknowledgment by default. However, the extension will log if there is any failure when publishing the event.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_DEVSERVICES_ENABLED+++[]
Expand Down
55 changes: 47 additions & 8 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ quarkus.solace.authentication.basic.password=test

In similar way other authentication mechanisms can be enabled

CAUTION: In the current version we don't recommend to use OAuth as it in evolving phase.


[[configuring-quarkus-solace-messaging-connector]]
== Configuring Quarkus Solace Messaging Connector
Expand All @@ -184,7 +182,9 @@ Reactive Messaging framework supports different messaging backends it employs a

* Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Solace is named `quarkus-solace`.

A minimal configuration for the Solace connector with an incoming channel looks like the following:
The extension supports two types of messaging client `direct` and `persistent`. By default `persistent` client is enabled.

A minimal configuration for the Solace connector with an incoming channel and `persistent` client looks like the following:

The following lines of configuration assumes that a exclusive queue is already provisioned on the broker
[source,properties]
Expand All @@ -210,7 +210,21 @@ quarkus.solace.authentication.basic.password=basic
mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.queue.missing-resource-creation-strategy=create-on-start
mp.messaging.incoming.temperatures.consumer.queue.add-additional-subscriptions=true
mp.messaging.incoming.temperatures.consumer.queue.subscriptions=hello/foobar
mp.messaging.incoming.temperatures.consumer.subscriptions=hello/foobar
----

A minimal configuration for the Solace connector with an incoming channel and `direct` client looks like the following:

[source,properties]
----
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
mp.messaging.incoming.temperatures.client.type=direct
mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.subscriptions=sensor/temperatures
----

1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.
Expand Down Expand Up @@ -257,7 +271,8 @@ __SolaceInboundMessage__ This is a wrapper to incoming Inbound Message from Sola

[source,java]
----
import com.solace.messaging.receiver.InboundMessage;@ApplicationScoped
import com.solace.messaging.receiver.InboundMessage;
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
public void consume(InboundMessage inboundMessage) {
Expand All @@ -270,7 +285,9 @@ public class TemperaturesConsumer {
[[acknowledgement-handling]]
== Acknowledgment Handling

By default, acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing.
By default, for `persistent` client acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing.

In case of `direct` client no acknowledgement is sent to the broker.

[source,java]
----
Expand All @@ -291,7 +308,7 @@ public class TemperaturesConsumer {
[[failure-strategies]]
== Failure Strategies

If a message is nacked, a failure strategy is applied. Refer to <<extension-incoming-configuration-reference>><<quarkus-solace_quarkus.consumer.queue.failure-strategy>>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension.
If a message is nacked, a failure strategy is applied. Refer to <<extension-incoming-configuration-reference>><<quarkus-solace_quarkus.consumer.failure-strategy>>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension.

`ignore` - Mark the message as IGNORED, will continue processing with next message. It TTL and DMQ are configured on the queue message will be moved to DMQ once TTL is reached. If no DMQ is configured but TTL is set message will be lost.

Expand All @@ -304,7 +321,7 @@ If a message is nacked, a failure strategy is applied. Refer to <<extension-inco
[[sending-messages-to-solace]]
== Sending messages to Solace

Outgoing channel configuration to publish messages to Solace.
Outgoing channel configuration to publish `persistent` messages to Solace.

[source,properties]
----
Expand All @@ -317,6 +334,20 @@ mp.messaging.outgoing.temperatures-out.connector=quarkus-solace
mp.messaging.outgoing.temperatures-out.producer.topic=temperatures
----

Outgoing channel configuration to publish `direct` messages to Solace.

[source,properties]
----
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
mp.messaging.outgoing.temperatures-out.client.type=direct
mp.messaging.outgoing.temperatures-out.connector=quarkus-solace
mp.messaging.outgoing.temperatures-out.producer.topic=temperatures
----

1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.

2. If `producer.topic` property is not specified, channel name will be used as topic name.
Expand Down Expand Up @@ -433,6 +464,14 @@ public class PublisherResource {
}
----

== Producer Acknowledgement

Producer can return successful acknowledgement when <<quarkus-solace_quarkus.producer.waitForPublishReceipt>> is enabled.

This property is considered only when <<quarkus-solace_quarkus.client.type>> is set to `persistent`. The connector will wait for response from broker and will return success or failed acknowledgement.

In case of <<quarkus-solace_quarkus.client.type>> is set to `direct` this property is ignored as broker will not send any response. By default, success acknowledgement is returned and any failures during publish are logged as exceptions.

== Producer Back-Pressure strategies

Quarkus Solace Messaging connector provides three different strategies to handle back-pressure when publishing messages
Expand Down
Loading

0 comments on commit 4484bac

Please sign in to comment.