From 388acd54535afc6aca21eca5fa4e28a46ba831f3 Mon Sep 17 00:00:00 2001 From: Ruben Gees Date: Fri, 9 Jul 2021 14:14:30 +0200 Subject: [PATCH] Add support for mqtt 5 and improve configuration property --- README.md | 30 ++-- .../starter/mqtt/BrokerConnectException.kt | 6 + .../starter/mqtt/MqttAutoConfiguration.kt | 101 +++++++++--- .../starter/mqtt/MqttClientConfigurer.kt | 25 ++- .../starter/mqtt/MqttMessageAdapter.kt | 11 +- .../starter/mqtt/MqttProperties.kt | 31 ++-- .../smartsquare/starter/mqtt/MqttPublisher.kt | 27 +++- .../de/smartsquare/starter/mqtt/MqttRouter.kt | 79 +++++++--- .../starter/mqtt/MqttVersionValidator.kt | 28 ++++ .../starter/mqtt/SpringAwareMqtt3Client.kt | 51 ------ .../starter/mqtt/SpringAwareMqttClient.kt | 95 ++++++++++++ ...itional-spring-configuration-metadata.json | 19 +++ ...nTest.kt => Mqtt3AutoConfigurationTest.kt} | 10 +- .../mqtt/Mqtt5AutoConfigurationTest.kt | 74 +++++++++ .../starter/mqtt/MqttClientConfigurerTest.kt | 2 +- .../starter/mqtt/MqttMessageAdapterTest.kt | 24 +-- .../starter/mqtt/MqttPropertiesTest.kt | 146 ++++++++++++++++++ 17 files changed, 608 insertions(+), 151 deletions(-) create mode 100644 src/main/kotlin/de/smartsquare/starter/mqtt/MqttVersionValidator.kt delete mode 100644 src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqtt3Client.kt create mode 100644 src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqttClient.kt rename src/test/kotlin/de/smartsquare/starter/mqtt/{MqttAutoConfigurationTest.kt => Mqtt3AutoConfigurationTest.kt} (91%) create mode 100644 src/test/kotlin/de/smartsquare/starter/mqtt/Mqtt5AutoConfigurationTest.kt create mode 100644 src/test/kotlin/de/smartsquare/starter/mqtt/MqttPropertiesTest.kt diff --git a/README.md b/README.md index 3b3e6ca..2a10d63 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # :honeybee: HiveMQ Spring Boot Starter -Use an automatically configured mqtt client in your Spring Boot project. +Use an automatically configured mqtt 3 or 5 client in your Spring Boot project. ## Getting Started @@ -20,36 +20,34 @@ dependencies { ### Application Properties +The main configuration mechanism is via properties. All arguments are optional (the default being an anonymous broker on +localhost:1883 with mqtt version 3). + ```properties # The host to connect to. mqtt.host=test.mosquitto.org - # The port to connect to. mqtt.port=1883 - -# The clientId to use when connecting (optional, random by default). +# The clientId to use when connecting (random by default). mqtt.client-id=test - # The username to use when connecting. mqtt.username=admin - # The password to use when connecting. mqtt.password=test - # If the connection should be encrypted. mqtt.ssl=false - -# If the session should be clean (optional, true by default). +# If the session should be clean (true by default). mqtt.clean=false - -# The group to use for shared subscriptions (optional). +# The group to use for shared subscriptions. mqtt.group=group +# The mqtt protocol version to use. 3 and 5 are supported. +mqtt.version=3 ``` ### Advanced -It is possible to additionally configure the client programmatically by implementing the `MqttClientConfigurer` -interface and exposing it as a bean. +It is possible to additionally configure the client programmatically by implementing either the `Mqtt3ClientConfigurer` +or `Mqtt5ClientConfigurer` interface and exposing it as a bean. ```kotlin @Component @@ -112,14 +110,14 @@ class TestConsumer { ### Publisher -Messages cann be published via the `MqttPublisher`. +Messages cann be published via the `Mqtt3Publisher` or `Mqtt5Publisher`. ```kotlin import com.hivemq.client.mqtt.datatypes.MqttQos.AT_LEAST_ONCE import org.springframework.stereotype.Component @Component -class TestPublisher(private val mqttPublisher: MqttPublisher) { +class TestPublisher(private val mqttPublisher: Mqtt3Publisher) { fun publish(payload: TemperaturePayload) { mqttPublisher.publish("/home/temperature", AT_LEAST_ONCE, payload) @@ -131,7 +129,7 @@ class TestPublisher(private val mqttPublisher: MqttPublisher) { ### Direct usage -The `MqttClient` is also exposed and can be used directly. +Depending on the version, an `Mqtt3Client` or `Mqtt5Client` is also exposed and can be used directly. ```kotlin import com.hivemq.client.mqtt.mqtt3.Mqtt3Client diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/BrokerConnectException.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/BrokerConnectException.kt index ade82f9..8b32678 100644 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/BrokerConnectException.kt +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/BrokerConnectException.kt @@ -1,6 +1,7 @@ package de.smartsquare.starter.mqtt import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck /** * Exception thrown when the connection to mqtt broker fails. @@ -10,4 +11,9 @@ class BrokerConnectException(message: String, cause: Throwable? = null) : Runtim "Unable to connect to broker. Return Code: ${acknowledgement.returnCode.code}", cause ) + + constructor(acknowledgement: Mqtt5ConnAck, cause: Throwable? = null) : this( + "Unable to connect to broker. Return code: ${acknowledgement.reasonCode.code}", + cause + ) } diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfiguration.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfiguration.kt index d5af360..cba3447 100644 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfiguration.kt +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfiguration.kt @@ -2,10 +2,14 @@ package de.smartsquare.starter.mqtt import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.hivemq.client.mqtt.MqttClient +import com.hivemq.client.mqtt.MqttClientBuilder import com.hivemq.client.mqtt.mqtt3.Mqtt3Client import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnClass +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -22,18 +26,64 @@ class MqttAutoConfiguration { private val logger = LoggerFactory.getLogger(javaClass) /** - * Returns a configured and ready to use mqtt client. + * Returns a configured and ready to use mqtt 3 client. */ @Bean - fun mqttClient(config: MqttProperties, configurers: List): Mqtt3Client { - val clientBuilder = Mqtt3Client.builder() + @ConditionalOnProperty("mqtt.version", havingValue = "3", matchIfMissing = true) + fun mqtt3Client(config: MqttProperties, configurers: List): Mqtt3Client { + val clientBuilder = configureCommon(config) + .useMqttVersion3() + .apply { + config.username?.let { username -> + config.password?.let { password -> + simpleAuth() + .username(username) + .password(password.toByteArray()) + .applySimpleAuth() + } + } + } + .apply { configurers.forEach { configurer -> configurer.configure(this) } } + + val connectOptions = Mqtt3Connect.builder() + .cleanSession(config.clean) + .build() + + return SpringAwareMqtt3Client(clientBuilder.build(), connectOptions) + } + + /** + * Returns a configured and ready to use mqtt 5 client. + */ + @Bean + @ConditionalOnProperty("mqtt.version", havingValue = "5") + fun mqtt5Client(config: MqttProperties, configurers: List): Mqtt5Client { + val clientBuilder = configureCommon(config) + .useMqttVersion5() + .apply { + config.username?.let { username -> + config.password?.let { password -> + simpleAuth() + .username(username) + .password(password.toByteArray()) + .applySimpleAuth() + } + } + } + .apply { configurers.forEach { configurer -> configurer.configure(this) } } + + val connectOptions = Mqtt5Connect.builder() + .cleanStart(config.clean) + .build() + + return SpringAwareMqtt5Client(clientBuilder.build(), connectOptions) + } + + private fun configureCommon(config: MqttProperties): MqttClientBuilder { + return MqttClient.builder() .serverHost(config.host) .serverPort(config.port) .automaticReconnectWithDefaultConfig() - .simpleAuth() - .username(config.username) - .password(config.password.toByteArray()) - .applySimpleAuth() .addConnectedListener { logger.info("Connected to broker.") } .addDisconnectedListener { if (it.reconnector.isReconnect) { @@ -44,13 +94,6 @@ class MqttAutoConfiguration { } .apply { if (config.ssl) sslWithDefaultConfig() } .apply { config.clientId?.also { clientId -> identifier(clientId) } } - .apply { configurers.forEach { configurer -> configurer.configure(this) } } - - val connectOptions = Mqtt3Connect.builder() - .cleanSession(config.clean) - .build() - - return SpringAwareMqtt3Client(clientBuilder.build(), connectOptions) } @Bean @@ -62,20 +105,36 @@ class MqttAutoConfiguration { } @Bean - fun router( + @ConditionalOnProperty("mqtt.version", havingValue = "3", matchIfMissing = true) + fun mqtt3Router( messageAdapter: MqttMessageAdapter, collector: AnnotationCollector, config: MqttProperties, client: Mqtt3Client - ): MqttRouter { - return MqttRouter(collector, messageAdapter, config, client) + ): Mqtt3Router { + return Mqtt3Router(collector, messageAdapter, config, client) } @Bean - fun publisher( + @ConditionalOnProperty("mqtt.version", havingValue = "5") + fun mqtt5Router( messageAdapter: MqttMessageAdapter, - client: Mqtt3Client - ): MqttPublisher { - return MqttPublisher(messageAdapter, client) + collector: AnnotationCollector, + config: MqttProperties, + client: Mqtt5Client + ): Mqtt5Router { + return Mqtt5Router(collector, messageAdapter, config, client) + } + + @Bean + @ConditionalOnProperty("mqtt.version", havingValue = "3", matchIfMissing = true) + fun mqtt3Publisher(messageAdapter: MqttMessageAdapter, client: Mqtt3Client): Mqtt3Publisher { + return Mqtt3Publisher(messageAdapter, client) + } + + @Bean + @ConditionalOnProperty("mqtt.version", havingValue = "5") + fun mqtt5Publisher(messageAdapter: MqttMessageAdapter, client: Mqtt5Client): Mqtt5Publisher { + return Mqtt5Publisher(messageAdapter, client) } } diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurer.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurer.kt index 1d9e48d..8ab0a78 100644 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurer.kt +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurer.kt @@ -1,7 +1,30 @@ package de.smartsquare.starter.mqtt import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder -interface MqttClientConfigurer { +/** + * Interface to enable more advanced configuration for the [Mqtt3ClientBuilder] than what is possible with the + * properties. + */ +interface Mqtt3ClientConfigurer { + + /** + * To be implemented by consumers. Can perform any configuration on the given [builder] in place. + * This is called after all other configuration have been done so this is the final step. + */ fun configure(builder: Mqtt3ClientBuilder) } + +/** + * Interface to enable more advanced configuration for the [Mqtt5ClientBuilder] than what is possible with the + * properties. + */ +interface Mqtt5ClientConfigurer { + + /** + * To be implemented by consumers. Can perform any configuration on the given [builder] in place. + * This is called after all other configuration have been done so this is the final step. + */ + fun configure(builder: Mqtt5ClientBuilder) +} diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapter.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapter.kt index 690255f..d19c81d 100644 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapter.kt +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapter.kt @@ -2,7 +2,6 @@ package de.smartsquare.starter.mqtt import com.fasterxml.jackson.databind.ObjectMapper import com.hivemq.client.mqtt.datatypes.MqttTopic -import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish /** * Adapter class with methods for converting from and to mqtt payloads. @@ -10,14 +9,14 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish class MqttMessageAdapter(private val objectMapper: ObjectMapper) { /** - * Converts the given [message] into the expected [targetType]. + * Converts the given [topic] and [payload] into the expected [targetType]. */ @Suppress("UNCHECKED_CAST") - fun adapt(message: Mqtt3Publish, targetType: Class): T { + fun adapt(topic: MqttTopic, payload: ByteArray, targetType: Class): T { return when { - targetType.isAssignableFrom(MqttTopic::class.java) -> message.topic as T - targetType.isAssignableFrom(String::class.java) -> message.payloadAsBytes.decodeToString() as T - else -> objectMapper.readValue(message.payloadAsBytes, targetType) + targetType.isAssignableFrom(MqttTopic::class.java) -> topic as T + targetType.isAssignableFrom(String::class.java) -> payload.decodeToString() as T + else -> objectMapper.readValue(payload, targetType) } } diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttProperties.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttProperties.kt index cfdee90..0848841 100644 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttProperties.kt +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttProperties.kt @@ -3,6 +3,8 @@ package de.smartsquare.starter.mqtt import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.ConstructorBinding import org.springframework.validation.annotation.Validated +import javax.validation.constraints.Max +import javax.validation.constraints.Min import javax.validation.constraints.NotEmpty /** @@ -14,38 +16,37 @@ import javax.validation.constraints.NotEmpty data class MqttProperties( /** - * The port the mqtt broker is available under. + * The host the mqtt broker is available under. */ - val port: Int, + @get:NotEmpty + val host: String = "localhost", /** - * The host the mqtt broker is available under. + * The port the mqtt broker is available under. */ - @NotEmpty - val host: String, + @field:Min(0) + @field:Max(65535) + val port: Int = 1883, /** * The client id this component should connect with. */ - @NotEmpty val clientId: String? = null, /** * The username this component should connect with. */ - @NotEmpty - val username: String, + val username: String? = null, /** * The password this component should connect with. */ - @NotEmpty - val password: String, + val password: String? = null, /** * If ssl should be used for the connection to the mqtt broker. */ - val ssl: Boolean, + val ssl: Boolean = false, /** * If the client should connect with a clean session. @@ -55,5 +56,11 @@ data class MqttProperties( /** * The optional group subscriptions should be prefixed with. */ - val group: String? = null + val group: String? = null, + + /** + * The mqtt protocol version to use. + */ + @get:MqttVersion + val version: Int = 3 ) diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttPublisher.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttPublisher.kt index 3cf6b01..d948fcb 100644 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttPublisher.kt +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttPublisher.kt @@ -3,12 +3,15 @@ package de.smartsquare.starter.mqtt import com.hivemq.client.mqtt.datatypes.MqttQos import com.hivemq.client.mqtt.mqtt3.Mqtt3Client import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult import java.util.concurrent.CompletableFuture /** * Class with convenience methods for publishing a message on the mqtt broker. */ -class MqttPublisher(private val adapter: MqttMessageAdapter, client: Mqtt3Client) { +class Mqtt3Publisher(private val adapter: MqttMessageAdapter, client: Mqtt3Client) { private val asyncClient = client.toAsync() @@ -26,3 +29,25 @@ class MqttPublisher(private val adapter: MqttMessageAdapter, client: Mqtt3Client ) } } + +/** + * Class with convenience methods for publishing a message on the mqtt broker. + */ +class Mqtt5Publisher(private val adapter: MqttMessageAdapter, client: Mqtt5Client) { + + private val asyncClient = client.toAsync() + + /** + * Publishes the given [payload] on [topic] with quality of service level [qos]. + * Returns a [CompletableFuture] that is completed once the broker has accepted the message. + */ + fun publish(topic: String, qos: MqttQos, payload: Any): CompletableFuture { + return asyncClient.publish( + Mqtt5Publish.builder() + .topic(topic) + .qos(qos) + .payload(adapter.adapt(payload)) + .build() + ) + } +} diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttRouter.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttRouter.kt index 0a4c027..941703c 100644 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttRouter.kt +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttRouter.kt @@ -1,53 +1,96 @@ package de.smartsquare.starter.mqtt +import com.hivemq.client.mqtt.datatypes.MqttQos +import com.hivemq.client.mqtt.datatypes.MqttTopic import com.hivemq.client.mqtt.mqtt3.Mqtt3Client -import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client import org.slf4j.LoggerFactory import org.springframework.beans.factory.InitializingBean import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method /** - * Helper class that subscribes to the mqtt broker and routes received messages to the configured subscribers (methods - * annotated with [MqttSubscribe]). + * Abstract base class for all routers that handles the common implementation. */ -class MqttRouter( +abstract class MqttRouter( private val collector: AnnotationCollector, private val adapter: MqttMessageAdapter, - private val config: MqttProperties, - client: Mqtt3Client + private val config: MqttProperties ) : InitializingBean { - private val logger = LoggerFactory.getLogger(javaClass) - - private val asyncClient = client.toAsync() + private val logger = LoggerFactory.getLogger(this::class.java) override fun afterPropertiesSet() { for ((bean, subscribers) in collector.subscribers) { for (subscriber in subscribers) { val annotation = subscriber.getAnnotation(MqttSubscribe::class.java) - val topic = if (annotation.shared && config.group != null) { + + val subscribeTopic = if (annotation.shared && config.group != null) { "\$share/${config.group}/${annotation.topic}" } else { annotation.topic } - asyncClient.subscribeWith() - .topicFilter(topic) - .qos(annotation.qos) - .callback { message -> deliver(subscriber, bean, message) } - .send() + subscribe(subscribeTopic, annotation.qos) { topic, payload -> + deliver(subscriber, bean, topic, payload) + } } } } - private fun deliver(subscriber: Method, bean: Any, message: Mqtt3Publish) { + protected abstract fun subscribe(topic: String, qos: MqttQos, subscribe: (MqttTopic, ByteArray) -> Unit) + + private fun deliver(subscriber: Method, bean: Any, topic: MqttTopic, payload: ByteArray) { try { - val parameters = subscriber.parameterTypes.map { adapter.adapt(message, it) }.toTypedArray() + val parameters = subscriber.parameterTypes.map { adapter.adapt(topic, payload, it) }.toTypedArray() subscriber.invoke(bean, *parameters) } catch (e: InvocationTargetException) { - logger.error("Error while delivering mqtt message.", e) + logger.error("Error while delivering mqtt message.", e.cause) } } } + +/** + * Helper class that subscribes to the mqtt broker and routes received messages to the configured subscribers (methods + * annotated with [MqttSubscribe]). + */ +class Mqtt3Router( + collector: AnnotationCollector, + adapter: MqttMessageAdapter, + config: MqttProperties, + client: Mqtt3Client +) : MqttRouter(collector, adapter, config) { + + private val asyncClient = client.toAsync() + + override fun subscribe(topic: String, qos: MqttQos, subscribe: (MqttTopic, ByteArray) -> Unit) { + asyncClient.subscribeWith() + .topicFilter(topic) + .qos(qos) + .callback { subscribe(it.topic, it.payloadAsBytes) } + .send() + } +} + +/** + * Helper class that subscribes to the mqtt broker and routes received messages to the configured subscribers (methods + * annotated with [MqttSubscribe]). + */ +class Mqtt5Router( + collector: AnnotationCollector, + adapter: MqttMessageAdapter, + config: MqttProperties, + client: Mqtt5Client +) : MqttRouter(collector, adapter, config) { + + private val asyncClient = client.toAsync() + + override fun subscribe(topic: String, qos: MqttQos, subscribe: (MqttTopic, ByteArray) -> Unit) { + asyncClient.subscribeWith() + .topicFilter(topic) + .qos(qos) + .callback { subscribe(it.topic, it.payloadAsBytes) } + .send() + } +} diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/MqttVersionValidator.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttVersionValidator.kt new file mode 100644 index 0000000..bbeb962 --- /dev/null +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/MqttVersionValidator.kt @@ -0,0 +1,28 @@ +package de.smartsquare.starter.mqtt + +import javax.validation.Constraint +import javax.validation.ConstraintValidator +import javax.validation.ConstraintValidatorContext +import kotlin.reflect.KClass + +/** + * Annotation to mark properties or fields to be validated as an mqtt version. + */ +@Constraint(validatedBy = [MqttVersionValidator::class]) +@Target(AnnotationTarget.PROPERTY, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.FIELD) +@Retention(AnnotationRetention.RUNTIME) +annotation class MqttVersion( + val message: String = "Invalid mqtt version. Allowed are 3 and 5.", + val groups: Array> = [], + val payload: Array> = [] +) + +/** + * Custom validator for the mqtt version, enabled by the [MqttVersion] annotation. + */ +class MqttVersionValidator : ConstraintValidator { + + override fun isValid(value: Int?, context: ConstraintValidatorContext): Boolean { + return value == null || value == 3 || value == 5 + } +} diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqtt3Client.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqtt3Client.kt deleted file mode 100644 index 1726cbe..0000000 --- a/src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqtt3Client.kt +++ /dev/null @@ -1,51 +0,0 @@ -package de.smartsquare.starter.mqtt - -import com.hivemq.client.mqtt.mqtt3.Mqtt3Client -import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.DisposableBean -import org.springframework.beans.factory.InitializingBean -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException - -/** - * Subclass of the mqtt client to expose with awareness of the Spring lifecycle. Connects on creation and disconnects on - * destruction. All other methods are delegated to an internal instance. - * - * @property delegate The internal mqtt client. - * @property connectOptions Options to use when connecting. - */ -class SpringAwareMqtt3Client( - private val delegate: Mqtt3Client, - private val connectOptions: Mqtt3Connect = Mqtt3Connect.builder().build() -) : Mqtt3Client by delegate, InitializingBean, DisposableBean { - - private val logger = LoggerFactory.getLogger(this::class.java) - - override fun afterPropertiesSet() { - val host = delegate.config.serverHost - val port = delegate.config.serverPort - val username = delegate.config.simpleAuth.orElseGet { null }?.username?.toString() - - logger.info("Connecting to ${if (username != null) "$username@" else ""}$host:$port...") - - try { - val acknowledgement = delegate.toAsync() - .connect(connectOptions) - .get(10, TimeUnit.SECONDS) - - if (acknowledgement.returnCode.isError) { - throw BrokerConnectException(acknowledgement) - } - } catch (error: TimeoutException) { - throw BrokerConnectException( - "Failed to connect: Broker $host:$port did not respond within 10 seconds.", - error - ) - } - } - - override fun destroy() { - delegate.toAsync().disconnect().get(10, TimeUnit.SECONDS) - } -} diff --git a/src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqttClient.kt b/src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqttClient.kt new file mode 100644 index 0000000..766e45f --- /dev/null +++ b/src/main/kotlin/de/smartsquare/starter/mqtt/SpringAwareMqttClient.kt @@ -0,0 +1,95 @@ +package de.smartsquare.starter.mqtt + +import com.hivemq.client.mqtt.mqtt3.Mqtt3Client +import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.DisposableBean +import org.springframework.beans.factory.InitializingBean +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + +/** + * Subclass of the mqtt 3 client to expose with awareness of the Spring lifecycle. + * Connects on creation and disconnects on destruction. All other methods are delegated to an internal instance. + * + * @property delegate The internal mqtt client. + * @property connectOptions Options to use when connecting. + */ +class SpringAwareMqtt3Client( + private val delegate: Mqtt3Client, + private val connectOptions: Mqtt3Connect = Mqtt3Connect.builder().build() +) : Mqtt3Client by delegate, InitializingBean, DisposableBean { + + private val logger = LoggerFactory.getLogger(this::class.java) + + override fun afterPropertiesSet() { + val host = delegate.config.serverHost + val port = delegate.config.serverPort + val username = delegate.config.simpleAuth.orElseGet { null }?.username?.toString() + + logger.info("Connecting to ${if (username != null) "$username@" else ""}$host:$port using mqtt 3...") + + try { + val acknowledgement = delegate.toAsync() + .connect(connectOptions) + .get(10, TimeUnit.SECONDS) + + if (acknowledgement.returnCode.isError) { + throw BrokerConnectException(acknowledgement) + } + } catch (error: TimeoutException) { + throw BrokerConnectException( + "Failed to connect: Broker $host:$port did not respond within 10 seconds.", + error + ) + } + } + + override fun destroy() { + delegate.toAsync().disconnect().get(10, TimeUnit.SECONDS) + } +} + +/** + * Subclass of the mqtt 5 client to expose with awareness of the Spring lifecycle. + * Connects on creation and disconnects on destruction. All other methods are delegated to an internal instance. + * + * @property delegate The internal mqtt client. + * @property connectOptions Options to use when connecting. + */ +class SpringAwareMqtt5Client( + private val delegate: Mqtt5Client, + private val connectOptions: Mqtt5Connect = Mqtt5Connect.builder().build() +) : Mqtt5Client by delegate, InitializingBean, DisposableBean { + + private val logger = LoggerFactory.getLogger(this::class.java) + + override fun afterPropertiesSet() { + val host = delegate.config.serverHost + val port = delegate.config.serverPort + val username = delegate.config.simpleAuth.flatMap { it.username }.orElseGet { null }?.toString() + + logger.info("Connecting to ${if (username != null) "$username@" else ""}$host:$port using mqtt 5...") + + try { + val acknowledgement = delegate.toAsync() + .connect(connectOptions) + .get(10, TimeUnit.SECONDS) + + if (acknowledgement.reasonCode.isError) { + throw BrokerConnectException(acknowledgement) + } + } catch (error: TimeoutException) { + throw BrokerConnectException( + "Failed to connect: Broker $host:$port did not respond within 10 seconds.", + error + ) + } + } + + override fun destroy() { + delegate.toAsync().disconnect().get(10, TimeUnit.SECONDS) + } +} diff --git a/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/src/main/resources/META-INF/additional-spring-configuration-metadata.json index fa83710..f59246d 100644 --- a/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -11,6 +11,25 @@ { "name": "mqtt.ssl", "defaultValue": false + }, + { + "name": "mqtt.version", + "defaultValue": 3 + } + ], + "hints": [ + { + "name": "mqtt.version", + "values": [ + { + "value": 3, + "description": "Use version 3.1.1." + }, + { + "value": 5, + "description": "Use version 5.0." + } + ] } ] } diff --git a/src/test/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfigurationTest.kt b/src/test/kotlin/de/smartsquare/starter/mqtt/Mqtt3AutoConfigurationTest.kt similarity index 91% rename from src/test/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfigurationTest.kt rename to src/test/kotlin/de/smartsquare/starter/mqtt/Mqtt3AutoConfigurationTest.kt index d21ec15..de2b761 100644 --- a/src/test/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfigurationTest.kt +++ b/src/test/kotlin/de/smartsquare/starter/mqtt/Mqtt3AutoConfigurationTest.kt @@ -3,9 +3,9 @@ package de.smartsquare.starter.mqtt import com.hivemq.client.mqtt.datatypes.MqttQos import com.hivemq.client.mqtt.mqtt3.Mqtt3Client import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish -import de.smartsquare.starter.mqtt.MqttAutoConfigurationTest.IntSubscriber -import de.smartsquare.starter.mqtt.MqttAutoConfigurationTest.ObjectSubscriber -import de.smartsquare.starter.mqtt.MqttAutoConfigurationTest.StringSubscriber +import de.smartsquare.starter.mqtt.Mqtt3AutoConfigurationTest.IntSubscriber +import de.smartsquare.starter.mqtt.Mqtt3AutoConfigurationTest.ObjectSubscriber +import de.smartsquare.starter.mqtt.Mqtt3AutoConfigurationTest.StringSubscriber import org.awaitility.kotlin.await import org.awaitility.kotlin.has import org.awaitility.kotlin.untilCallTo @@ -24,13 +24,13 @@ import org.springframework.stereotype.Component ObjectSubscriber::class ] ) -class MqttAutoConfigurationTest { +class Mqtt3AutoConfigurationTest { @Autowired private lateinit var client: Mqtt3Client @Autowired - private lateinit var publisher: MqttPublisher + private lateinit var publisher: Mqtt3Publisher @Autowired private lateinit var intSubscriber: IntSubscriber diff --git a/src/test/kotlin/de/smartsquare/starter/mqtt/Mqtt5AutoConfigurationTest.kt b/src/test/kotlin/de/smartsquare/starter/mqtt/Mqtt5AutoConfigurationTest.kt new file mode 100644 index 0000000..376030a --- /dev/null +++ b/src/test/kotlin/de/smartsquare/starter/mqtt/Mqtt5AutoConfigurationTest.kt @@ -0,0 +1,74 @@ +package de.smartsquare.starter.mqtt + +import com.hivemq.client.mqtt.datatypes.MqttQos +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish +import org.awaitility.kotlin.await +import org.awaitility.kotlin.has +import org.awaitility.kotlin.untilCallTo +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.stereotype.Component + +@ExtendWith(EmqxExtension::class) +@SpringBootTest( + classes = [ + MqttAutoConfiguration::class, + Mqtt5AutoConfigurationTest.IntSubscriber::class, + ] +) +class Mqtt5AutoConfigurationTest { + + companion object { + + @JvmStatic + @BeforeAll + fun setUp() { + System.setProperty("mqtt.version", "5") + } + } + + @Autowired + private lateinit var client: Mqtt5Client + + @Autowired + private lateinit var publisher: Mqtt5Publisher + + @Autowired + private lateinit var intSubscriber: IntSubscriber + + @Test + fun `receives int message`() { + client.toBlocking() + .publish( + Mqtt5Publish.builder() + .topic("int") + .payload("2".toByteArray()) + .qos(MqttQos.EXACTLY_ONCE).build() + ) + + await untilCallTo { intSubscriber.receivedPayload } has { this == 2 } + } + + @Test + fun `publishes message`() { + publisher.publish("int", MqttQos.EXACTLY_ONCE, 1) + + await untilCallTo { intSubscriber.receivedPayload } has { this == 1 } + } + + @Component + class IntSubscriber { + + val receivedPayload get() = _receivedPayload + private var _receivedPayload: Int? = null + + @MqttSubscribe(topic = "int", qos = MqttQos.EXACTLY_ONCE) + fun onMessage(payload: Int) { + _receivedPayload = payload + } + } +} diff --git a/src/test/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurerTest.kt b/src/test/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurerTest.kt index fdaee51..0c3cd6f 100644 --- a/src/test/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurerTest.kt +++ b/src/test/kotlin/de/smartsquare/starter/mqtt/MqttClientConfigurerTest.kt @@ -27,7 +27,7 @@ class MqttClientConfigurerTest { } @Component - class IdentifierConfigurer : MqttClientConfigurer { + class IdentifierConfigurer : Mqtt3ClientConfigurer { override fun configure(builder: Mqtt3ClientBuilder) { builder.identifier("test") diff --git a/src/test/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapterTest.kt b/src/test/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapterTest.kt index 89971b5..7c25d91 100644 --- a/src/test/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapterTest.kt +++ b/src/test/kotlin/de/smartsquare/starter/mqtt/MqttMessageAdapterTest.kt @@ -1,7 +1,7 @@ package de.smartsquare.starter.mqtt import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish +import com.hivemq.client.mqtt.datatypes.MqttTopic import org.amshove.kluent.shouldBeEqualTo import org.amshove.kluent.shouldBeInstanceOf import org.junit.jupiter.api.Test @@ -12,24 +12,14 @@ class MqttMessageAdapterTest { @Test fun `should adapt int message`() { - val message = Mqtt3Publish.builder() - .topic("test") - .payload("1".encodeToByteArray()) - .build() - - val result = adapter.adapt(message, Int::class.java) + val result = adapter.adapt(MqttTopic.of("test"), "1".encodeToByteArray(), Int::class.java) result.shouldBeInstanceOf() result shouldBeEqualTo 1 } @Test fun `should adapt string message`() { - val message = Mqtt3Publish.builder() - .topic("test") - .payload("1".encodeToByteArray()) - .build() - - val result = adapter.adapt(message, String::class.java) + val result = adapter.adapt(MqttTopic.of("test"), "1".encodeToByteArray(), String::class.java) result.shouldBeInstanceOf() result shouldBeEqualTo "1" } @@ -37,13 +27,9 @@ class MqttMessageAdapterTest { @Test fun `should adapt object message`() { val obj = TemperatureMessage(1) + val payload = jacksonObjectMapper().writeValueAsBytes(obj) - val message = Mqtt3Publish.builder() - .topic("test") - .payload(jacksonObjectMapper().writeValueAsBytes(obj)) - .build() - - val result = adapter.adapt(message, TemperatureMessage::class.java) + val result = adapter.adapt(MqttTopic.of("test"), payload, TemperatureMessage::class.java) result.shouldBeInstanceOf() result shouldBeEqualTo obj } diff --git a/src/test/kotlin/de/smartsquare/starter/mqtt/MqttPropertiesTest.kt b/src/test/kotlin/de/smartsquare/starter/mqtt/MqttPropertiesTest.kt new file mode 100644 index 0000000..32775e7 --- /dev/null +++ b/src/test/kotlin/de/smartsquare/starter/mqtt/MqttPropertiesTest.kt @@ -0,0 +1,146 @@ +package de.smartsquare.starter.mqtt + +import org.amshove.kluent.AnyException +import org.amshove.kluent.invoking +import org.amshove.kluent.shouldBeEqualTo +import org.amshove.kluent.shouldNotThrow +import org.amshove.kluent.shouldStartWith +import org.amshove.kluent.shouldThrow +import org.junit.jupiter.api.Test +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.autoconfigure.context.ConfigurationPropertiesAutoConfiguration +import org.springframework.boot.autoconfigure.validation.ValidationAutoConfiguration +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.context.properties.bind.validation.BindValidationException +import org.springframework.boot.test.context.assertj.AssertableApplicationContext +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.core.NestedExceptionUtils + +class MqttPropertiesTest { + + @EnableConfigurationProperties(MqttProperties::class) + private class TestConfiguration + + @Test + fun `allows empty configuration`() { + ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + ConfigurationPropertiesAutoConfiguration::class.java, + ValidationAutoConfiguration::class.java + ) + ) + .withUserConfiguration(TestConfiguration::class.java) + .run { context: AssertableApplicationContext -> + invoking { context.getBean(MqttProperties::class.java) } shouldNotThrow AnyException + } + } + + @Test + fun `allows valid configuration`() { + ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + ConfigurationPropertiesAutoConfiguration::class.java, + ValidationAutoConfiguration::class.java + ) + ) + .withPropertyValues( + "mqtt.host=localhost", + "mqtt.port=10000", + "mqtt.client-id=clientId", + "mqtt.username=user", + "mqtt.password=pass", + "mqtt.ssl=true", + "mqtt.clean=false", + "mqtt.group=group", + "mqtt.version=5" + ) + .withUserConfiguration(TestConfiguration::class.java) + .run { context: AssertableApplicationContext -> + val bean = context.getBean(MqttProperties::class.java) + + bean shouldBeEqualTo MqttProperties( + host = "localhost", + port = 10000, + clientId = "clientId", + username = "user", + password = "pass", + ssl = true, + clean = false, + group = "group", + version = 5 + ) + } + } + + @Test + fun `validates non empty fields`() { + ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + ConfigurationPropertiesAutoConfiguration::class.java, + ValidationAutoConfiguration::class.java + ) + ) + .withPropertyValues("mqtt.host=") + .withUserConfiguration(TestConfiguration::class.java) + .run { context: AssertableApplicationContext -> + val error = invoking { + context.getBean(MqttProperties::class.java) + } shouldThrow IllegalStateException::class + + val rootError = NestedExceptionUtils.getRootCause(error.exception) + rootError as BindValidationException + + rootError.validationErrors.allErrors.size shouldBeEqualTo 1 + } + } + + @Test + fun `validates port range`() { + ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + ConfigurationPropertiesAutoConfiguration::class.java, + ValidationAutoConfiguration::class.java + ) + ) + .withPropertyValues("mqtt.port=65536") + .withUserConfiguration(TestConfiguration::class.java) + .run { context: AssertableApplicationContext -> + val error = invoking { + context.getBean(MqttProperties::class.java) + } shouldThrow IllegalStateException::class + + val rootError = NestedExceptionUtils.getRootCause(error.exception) + rootError as BindValidationException + + rootError.validationErrors.allErrors.size shouldBeEqualTo 1 + } + } + + @Test + fun `validates mqtt version`() { + ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + ConfigurationPropertiesAutoConfiguration::class.java, + ValidationAutoConfiguration::class.java + ) + ) + .withPropertyValues("mqtt.version=2") + .withUserConfiguration(TestConfiguration::class.java) + .run { context: AssertableApplicationContext -> + val error = invoking { + context.getBean(MqttProperties::class.java) + } shouldThrow IllegalStateException::class + + val rootError = NestedExceptionUtils.getRootCause(error.exception) + rootError as BindValidationException + + rootError.validationErrors.allErrors.size shouldBeEqualTo 1 + rootError.validationErrors.allErrors[0].defaultMessage shouldStartWith "Invalid mqtt version" + } + } +}