Skip to content

Commit

Permalink
feat: add JSON topic subscription to MQTT client
Browse files Browse the repository at this point in the history
  • Loading branch information
andrekir committed Jan 28, 2024
1 parent 9194386 commit f8a7596
Showing 1 changed file with 38 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ class MQTTRepository @Inject constructor(

private var mqttClient: MqttAsyncClient? = null

suspend fun connect(callback: MqttCallbackExtended) {
fun disconnect() {
info("MQTT Disconnected")
mqttClient?.apply {
ignoreException { disconnect() }
close(true)
mqttClient = null
}
}

val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
val ownerId = radioConfigRepository.nodeDB.myId.value ?: generateClientId()
val channelSet = radioConfigRepository.channelSetFlow.first()
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
Expand All @@ -57,16 +66,17 @@ class MQTTRepository @Inject constructor(
// Create a custom SSLContext that trusts all certificates
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())

val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId
val rootTopic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT }
val statTopic = "$rootTopic$STAT_TOPIC_LEVEL$ownerId"

val connectOptions = MqttConnectOptions().apply {
userName = mqttConfig.username
password = mqttConfig.password.toCharArray()
isCleanSession = false // must be false to auto subscribe on reconnects
isAutomaticReconnect = true
if (mqttConfig.tlsEnabled) {
socketFactory = sslContext.socketFactory
}
setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true)
setWill(statTopic, "offline".encodeToByteArray(), DEFAULT_QOS, true)
}

val bufferOptions = DisconnectedBufferOptions().apply {
Expand All @@ -76,46 +86,14 @@ class MQTTRepository @Inject constructor(
isDeleteOldestMessages = true
}

val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp"
val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS }
.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) }

val serverURI: String = URI(scheme, null, host, port, "", "", "").toString()

val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } +
DEFAULT_TOPIC_LEVEL // FIXME if (mqttConfig.jsonEnabled) JSON_TOPIC_LEVEL else DEFAULT_TOPIC_LEVEL

mqttClient = MqttAsyncClient(
serverURI,
ownerId,
MemoryPersistence(),
)
mqttClient?.apply {
setCallback(callback)
setBufferOpts(bufferOptions)
connect(connectOptions).waitForCompletion()

channelSet.subscribeList.forEach { globalId ->
val topicFilter = "$topic$globalId/#"
subscribe(topicFilter, DEFAULT_QOS).waitForCompletion()
info("MQTT Subscribed to topic: $topicFilter")
}
}
}

fun disconnect() {
info("MQTT Disconnected")
mqttClient?.apply {
ignoreException { disconnect() }
close(true)
mqttClient = null
}
}

val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
val callback = object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
info("MQTT connectComplete: $serverURI reconnect: $reconnect ")
channelSet.subscribeList.forEach { globalId ->
subscribe("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/#")
if (mqttConfig.jsonEnabled) subscribe("$rootTopic$JSON_TOPIC_LEVEL$globalId/#")
}
// publish(statTopic, "online".encodeToByteArray(), DEFAULT_QOS, true)
}

override fun connectionLost(cause: Throwable) {
Expand All @@ -135,11 +113,29 @@ class MQTTRepository @Inject constructor(
info("MQTT deliveryComplete messageId: ${token?.messageId}")
}
}
connect(callback)

val scheme = if (mqttConfig.tlsEnabled) "ssl" else "tcp"
val (host, port) = mqttConfig.address.ifEmpty { DEFAULT_SERVER_ADDRESS }
.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1) }

mqttClient = MqttAsyncClient(
URI(scheme, null, host, port, "", "", "").toString(),
ownerId,
MemoryPersistence(),
).apply {
setCallback(callback)
setBufferOpts(bufferOptions)
connect(connectOptions)
}

awaitClose { disconnect() }
}

private fun subscribe(topic: String) {
mqttClient?.subscribe(topic, DEFAULT_QOS)
info("MQTT Subscribed to topic: $topic")
}

fun publish(topic: String, data: ByteArray, retained: Boolean) {
try {
val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained)
Expand Down

0 comments on commit f8a7596

Please sign in to comment.