Skip to content

Commit

Permalink
Add support for mqtt 5 and improve configuration property
Browse files Browse the repository at this point in the history
  • Loading branch information
rubengees committed Jul 9, 2021
1 parent 5c2d3bb commit 388acd5
Show file tree
Hide file tree
Showing 17 changed files with 608 additions and 151 deletions.
30 changes: 14 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# :honeybee: HiveMQ Spring Boot Starter

Use an automatically configured mqtt client in your Spring Boot project.
Use an automatically configured mqtt 3 or 5 client in your Spring Boot project.

## Getting Started

Expand All @@ -20,36 +20,34 @@ dependencies {

### Application Properties

The main configuration mechanism is via properties. All arguments are optional (the default being an anonymous broker on
localhost:1883 with mqtt version 3).

```properties
# The host to connect to.
mqtt.host=test.mosquitto.org

# The port to connect to.
mqtt.port=1883

# The clientId to use when connecting (optional, random by default).
# The clientId to use when connecting (random by default).
mqtt.client-id=test

# The username to use when connecting.
mqtt.username=admin

# The password to use when connecting.
mqtt.password=test

# If the connection should be encrypted.
mqtt.ssl=false

# If the session should be clean (optional, true by default).
# If the session should be clean (true by default).
mqtt.clean=false

# The group to use for shared subscriptions (optional).
# The group to use for shared subscriptions.
mqtt.group=group
# The mqtt protocol version to use. 3 and 5 are supported.
mqtt.version=3
```

### Advanced

It is possible to additionally configure the client programmatically by implementing the `MqttClientConfigurer`
interface and exposing it as a bean.
It is possible to additionally configure the client programmatically by implementing either the `Mqtt3ClientConfigurer`
or `Mqtt5ClientConfigurer` interface and exposing it as a bean.

```kotlin
@Component
Expand Down Expand Up @@ -112,14 +110,14 @@ class TestConsumer {

### Publisher

Messages cann be published via the `MqttPublisher`.
Messages cann be published via the `Mqtt3Publisher` or `Mqtt5Publisher`.

```kotlin
import com.hivemq.client.mqtt.datatypes.MqttQos.AT_LEAST_ONCE
import org.springframework.stereotype.Component

@Component
class TestPublisher(private val mqttPublisher: MqttPublisher) {
class TestPublisher(private val mqttPublisher: Mqtt3Publisher) {

fun publish(payload: TemperaturePayload) {
mqttPublisher.publish("/home/temperature", AT_LEAST_ONCE, payload)
Expand All @@ -131,7 +129,7 @@ class TestPublisher(private val mqttPublisher: MqttPublisher) {

### Direct usage

The `MqttClient` is also exposed and can be used directly.
Depending on the version, an `Mqtt3Client` or `Mqtt5Client` is also exposed and can be used directly.

```kotlin
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.smartsquare.starter.mqtt

import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck

/**
* Exception thrown when the connection to mqtt broker fails.
Expand All @@ -10,4 +11,9 @@ class BrokerConnectException(message: String, cause: Throwable? = null) : Runtim
"Unable to connect to broker. Return Code: ${acknowledgement.returnCode.code}",
cause
)

constructor(acknowledgement: Mqtt5ConnAck, cause: Throwable? = null) : this(
"Unable to connect to broker. Return code: ${acknowledgement.reasonCode.code}",
cause
)
}
101 changes: 80 additions & 21 deletions src/main/kotlin/de/smartsquare/starter/mqtt/MqttAutoConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package de.smartsquare.starter.mqtt

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.hivemq.client.mqtt.MqttClient
import com.hivemq.client.mqtt.MqttClientBuilder
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -22,18 +26,64 @@ class MqttAutoConfiguration {
private val logger = LoggerFactory.getLogger(javaClass)

/**
* Returns a configured and ready to use mqtt client.
* Returns a configured and ready to use mqtt 3 client.
*/
@Bean
fun mqttClient(config: MqttProperties, configurers: List<MqttClientConfigurer>): Mqtt3Client {
val clientBuilder = Mqtt3Client.builder()
@ConditionalOnProperty("mqtt.version", havingValue = "3", matchIfMissing = true)
fun mqtt3Client(config: MqttProperties, configurers: List<Mqtt3ClientConfigurer>): Mqtt3Client {
val clientBuilder = configureCommon(config)
.useMqttVersion3()
.apply {
config.username?.let { username ->
config.password?.let { password ->
simpleAuth()
.username(username)
.password(password.toByteArray())
.applySimpleAuth()
}
}
}
.apply { configurers.forEach { configurer -> configurer.configure(this) } }

val connectOptions = Mqtt3Connect.builder()
.cleanSession(config.clean)
.build()

return SpringAwareMqtt3Client(clientBuilder.build(), connectOptions)
}

/**
* Returns a configured and ready to use mqtt 5 client.
*/
@Bean
@ConditionalOnProperty("mqtt.version", havingValue = "5")
fun mqtt5Client(config: MqttProperties, configurers: List<Mqtt5ClientConfigurer>): Mqtt5Client {
val clientBuilder = configureCommon(config)
.useMqttVersion5()
.apply {
config.username?.let { username ->
config.password?.let { password ->
simpleAuth()
.username(username)
.password(password.toByteArray())
.applySimpleAuth()
}
}
}
.apply { configurers.forEach { configurer -> configurer.configure(this) } }

val connectOptions = Mqtt5Connect.builder()
.cleanStart(config.clean)
.build()

return SpringAwareMqtt5Client(clientBuilder.build(), connectOptions)
}

private fun configureCommon(config: MqttProperties): MqttClientBuilder {
return MqttClient.builder()
.serverHost(config.host)
.serverPort(config.port)
.automaticReconnectWithDefaultConfig()
.simpleAuth()
.username(config.username)
.password(config.password.toByteArray())
.applySimpleAuth()
.addConnectedListener { logger.info("Connected to broker.") }
.addDisconnectedListener {
if (it.reconnector.isReconnect) {
Expand All @@ -44,13 +94,6 @@ class MqttAutoConfiguration {
}
.apply { if (config.ssl) sslWithDefaultConfig() }
.apply { config.clientId?.also { clientId -> identifier(clientId) } }
.apply { configurers.forEach { configurer -> configurer.configure(this) } }

val connectOptions = Mqtt3Connect.builder()
.cleanSession(config.clean)
.build()

return SpringAwareMqtt3Client(clientBuilder.build(), connectOptions)
}

@Bean
Expand All @@ -62,20 +105,36 @@ class MqttAutoConfiguration {
}

@Bean
fun router(
@ConditionalOnProperty("mqtt.version", havingValue = "3", matchIfMissing = true)
fun mqtt3Router(
messageAdapter: MqttMessageAdapter,
collector: AnnotationCollector,
config: MqttProperties,
client: Mqtt3Client
): MqttRouter {
return MqttRouter(collector, messageAdapter, config, client)
): Mqtt3Router {
return Mqtt3Router(collector, messageAdapter, config, client)
}

@Bean
fun publisher(
@ConditionalOnProperty("mqtt.version", havingValue = "5")
fun mqtt5Router(
messageAdapter: MqttMessageAdapter,
client: Mqtt3Client
): MqttPublisher {
return MqttPublisher(messageAdapter, client)
collector: AnnotationCollector,
config: MqttProperties,
client: Mqtt5Client
): Mqtt5Router {
return Mqtt5Router(collector, messageAdapter, config, client)
}

@Bean
@ConditionalOnProperty("mqtt.version", havingValue = "3", matchIfMissing = true)
fun mqtt3Publisher(messageAdapter: MqttMessageAdapter, client: Mqtt3Client): Mqtt3Publisher {
return Mqtt3Publisher(messageAdapter, client)
}

@Bean
@ConditionalOnProperty("mqtt.version", havingValue = "5")
fun mqtt5Publisher(messageAdapter: MqttMessageAdapter, client: Mqtt5Client): Mqtt5Publisher {
return Mqtt5Publisher(messageAdapter, client)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
package de.smartsquare.starter.mqtt

import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder

interface MqttClientConfigurer {
/**
* Interface to enable more advanced configuration for the [Mqtt3ClientBuilder] than what is possible with the
* properties.
*/
interface Mqtt3ClientConfigurer {

/**
* To be implemented by consumers. Can perform any configuration on the given [builder] in place.
* This is called after all other configuration have been done so this is the final step.
*/
fun configure(builder: Mqtt3ClientBuilder)
}

/**
* Interface to enable more advanced configuration for the [Mqtt5ClientBuilder] than what is possible with the
* properties.
*/
interface Mqtt5ClientConfigurer {

/**
* To be implemented by consumers. Can perform any configuration on the given [builder] in place.
* This is called after all other configuration have been done so this is the final step.
*/
fun configure(builder: Mqtt5ClientBuilder)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ package de.smartsquare.starter.mqtt

import com.fasterxml.jackson.databind.ObjectMapper
import com.hivemq.client.mqtt.datatypes.MqttTopic
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish

/**
* Adapter class with methods for converting from and to mqtt payloads.
*/
class MqttMessageAdapter(private val objectMapper: ObjectMapper) {

/**
* Converts the given [message] into the expected [targetType].
* Converts the given [topic] and [payload] into the expected [targetType].
*/
@Suppress("UNCHECKED_CAST")
fun <T> adapt(message: Mqtt3Publish, targetType: Class<T>): T {
fun <T> adapt(topic: MqttTopic, payload: ByteArray, targetType: Class<T>): T {
return when {
targetType.isAssignableFrom(MqttTopic::class.java) -> message.topic as T
targetType.isAssignableFrom(String::class.java) -> message.payloadAsBytes.decodeToString() as T
else -> objectMapper.readValue(message.payloadAsBytes, targetType)
targetType.isAssignableFrom(MqttTopic::class.java) -> topic as T
targetType.isAssignableFrom(String::class.java) -> payload.decodeToString() as T
else -> objectMapper.readValue(payload, targetType)
}
}

Expand Down
31 changes: 19 additions & 12 deletions src/main/kotlin/de/smartsquare/starter/mqtt/MqttProperties.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package de.smartsquare.starter.mqtt
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding
import org.springframework.validation.annotation.Validated
import javax.validation.constraints.Max
import javax.validation.constraints.Min
import javax.validation.constraints.NotEmpty

/**
Expand All @@ -14,38 +16,37 @@ import javax.validation.constraints.NotEmpty
data class MqttProperties(

/**
* The port the mqtt broker is available under.
* The host the mqtt broker is available under.
*/
val port: Int,
@get:NotEmpty
val host: String = "localhost",

/**
* The host the mqtt broker is available under.
* The port the mqtt broker is available under.
*/
@NotEmpty
val host: String,
@field:Min(0)
@field:Max(65535)
val port: Int = 1883,

/**
* The client id this component should connect with.
*/
@NotEmpty
val clientId: String? = null,

/**
* The username this component should connect with.
*/
@NotEmpty
val username: String,
val username: String? = null,

/**
* The password this component should connect with.
*/
@NotEmpty
val password: String,
val password: String? = null,

/**
* If ssl should be used for the connection to the mqtt broker.
*/
val ssl: Boolean,
val ssl: Boolean = false,

/**
* If the client should connect with a clean session.
Expand All @@ -55,5 +56,11 @@ data class MqttProperties(
/**
* The optional group subscriptions should be prefixed with.
*/
val group: String? = null
val group: String? = null,

/**
* The mqtt protocol version to use.
*/
@get:MqttVersion
val version: Int = 3
)
Loading

0 comments on commit 388acd5

Please sign in to comment.