Skip to content

Commit

Permalink
GH-9597: Fix Mqttv5PahoMessageHandler.publish() for concurrency
Browse files Browse the repository at this point in the history
Fixes: #9597
Issue link: #9597

When input channel for the endpoint with a `Mqttv5PahoMessageHandler` is an `ExecutorChannel` and a lot of concurrent messages are sent,
the `Connect already in progress (32110)` exception can be thrown

* Wrap `this.mqttClient.connect()` in the `Mqttv5PahoMessageHandler.publish()` into a `Lock`

(cherry picked from commit 334620f)
  • Loading branch information
artembilan authored and spring-builds committed Oct 25, 2024
1 parent 7170f27 commit 0f96549
Showing 1 changed file with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,15 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
long completionTimeout = getCompletionTimeout();
try {
if (!this.mqttClient.isConnected()) {
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
this.lock.lock();
try {
if (!this.mqttClient.isConnected()) {
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
}
}
finally {
this.lock.unlock();
}
}
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
Expand Down

0 comments on commit 0f96549

Please sign in to comment.