Skip to content

Commit

Permalink
* improved MQTT reconnect on WiFi disconnect
Browse files Browse the repository at this point in the history
* handling error messages on MQTT disconnect or error only on init
* we lost our Crashlytics reporting, adding back older method for Crashlytics
  • Loading branch information
thanksmister committed Nov 14, 2019
1 parent 2da06a4 commit eb5a62b
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 113 deletions.
12 changes: 11 additions & 1 deletion WallPanelApp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ apply plugin: 'kotlin-kapt'
apply plugin: 'io.fabric'
apply plugin: 'com.google.gms.google-services'

repositories {
mavenCentral()
maven { url 'https://maven.fabric.io/public' }
}


def versionMajor = 0
def versionMinor = 8
def versionPatch = 9
def versionBuild = 1 // bump for dog food builds, public betas, etc.
def versionBuild = 3 // bump for dog food builds, public betas, etc.

android {
kapt {
Expand Down Expand Up @@ -138,6 +144,10 @@ dependencies {

// (Recommended) Add Analytics
implementation 'com.crashlytics.sdk.android:crashlytics:2.10.1'

compile('com.crashlytics.sdk.android:crashlytics:2.10.1@aar') {
transitive = true;
}
}

configurations.all {
Expand Down
4 changes: 4 additions & 0 deletions WallPanelApp/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@
<meta-data android:name="com.google.android.gms.version"
android:value="@integer/google_play_services_version"/>

<meta-data
android:name="io.fabric.ApiKey"
android:value="d328872540a331f2245f3d7b842d06bbb7ac1c60" />

</application>

</manifest>
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ class MQTTModule (base: Context?, var mqttOptions: MQTTOptions, private val list

private var mqttService: MQTTService? = null

init {
}

@OnLifecycleEvent(Lifecycle.Event.ON_START)
private fun start() {
Timber.d("start")
Expand All @@ -46,15 +43,22 @@ class MQTTModule (base: Context?, var mqttOptions: MQTTOptions, private val list
// TODO should we loop back and try again?
Timber.e("Could not create MQTTPublisher: " + t.message)
}
} else {
try {
mqttService?.reconfigure(applicationContext, mqttOptions, this)
} catch (t: Throwable) {
// TODO should we loop back and try again?
Timber.e("Could not create MQTTPublisher: " + t.message)
}
}
}

@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
private fun stop() {
Timber.d("stop")
if (mqttService != null) {
mqttService?.let {
try {
mqttService!!.close()
it.close()
} catch (e: MqttException) {
e.printStackTrace()
}
Expand All @@ -76,23 +80,9 @@ class MQTTModule (base: Context?, var mqttOptions: MQTTOptions, private val list
fun publish(command: String, message : String) {
Timber.d("command: " + command)
Timber.d("message: " + message)
if(mqttService != null) {
mqttService!!.publish(command, message)
}
mqttService?.publish(command, message)
}

/*fun resetMQttOptions(mqttOptions: MQTTOptions) {
this.mqttOptions = mqttOptions
if (mqttService != null) {
try {
mqttService!!.reconfigure(applicationContext, mqttOptions, this)
} catch (t: Throwable) {
// TODO should we loop back and try again?
Timber.e("Could not create MQTTPublisher: " + t.message)
}
}
}*/

override fun subscriptionMessage(id: String, topic: String, payload: String) {
Timber.d("topic: " + topic)
listener.onMQTTMessage(id, topic, payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,43 @@ class MQTTService(private var context: Context, options: MQTTOptions,
if (it.isConnected) {
it.disconnect(0)
}
mqttClient = null
listener = null
mqttOptions = null
}
mReady.set(false)
}

override fun publish(command: String, payload: String) {
try {
if (isReady) {
if (mqttClient != null && !mqttClient!!.isConnected) {
// if for some reason the mqtt client has disconnected, we should try to connect
// it again.
try {
initializeMqttClient()
} catch (e: MqttException) {
listener?.handleMqttException("Could not initialize MQTT")
} catch (e: IOException) {
listener?.handleMqttException("Could not initialize MQTT")
} catch (e: GeneralSecurityException) {
listener?.handleMqttException("Could not initialize MQTT")
mqttClient?.let {
if (!it.isConnected) {
// if for some reason the mqtt client has disconnected, we should try to connect
// it again.
try {
initializeMqttClient()
} catch (e: MqttException) {
if (listener != null) {
listener!!.handleMqttException("Could not initialize MQTT: " + e.message)
}
} catch (e: IOException) {
if (listener != null) {
listener!!.handleMqttException("Could not initialize MQTT: " + e.message)
}
} catch (e: GeneralSecurityException) {
if (listener != null) {
listener!!.handleMqttException("Could not initialize MQTT: " + e.message)
}
}
}
}
// TODO append the "command" part
Timber.d("Publishing: $payload")
Timber.d("Base Topic: ${mqttOptions?.getBaseTopic()}")
Timber.d("Command Topic: $command")
Timber.d("Payload: $payload")
val mqttMessage = MqttMessage()
mqttMessage.payload = payload.toByteArray()
mqttMessage.isRetained = SHOULD_RETAIN
sendMessage(mqttOptions?.getBaseTopic() + command, mqttMessage)
mqttOptions?.let {
mqttMessage.isRetained = SHOULD_RETAIN
sendMessage(mqttOptions?.getBaseTopic() + command, mqttMessage)
}
}
} catch (e: MqttException) {
listener?.handleMqttException("Exception while publishing command $command and it's payload to the MQTT broker.")
Expand All @@ -130,10 +138,14 @@ class MQTTService(private var context: Context, options: MQTTOptions,
Timber.i("Broker: " + mqttOptions?.brokerUrl)
Timber.i("Subscribed to state topics: " + StringUtils.convertArrayToString(mqttOptions!!.getStateTopics()))
Timber.i("Publishing to base topic: " + mqttOptions!!.getBaseTopic())
if (mqttOptions!!.isValid) {
initializeMqttClient()
} else {
listener?.handleMqttDisconnected()
mqttOptions?.let {
if (it.isValid) {
initializeMqttClient()
} else {
if (listener != null) {
listener!!.handleMqttDisconnected()
}
}
}
} catch (e: MqttException) {
listener?.handleMqttException(context.getString(R.string.error_mqtt_connection))
Expand All @@ -148,11 +160,11 @@ class MQTTService(private var context: Context, options: MQTTOptions,
private fun initializeMqttClient() {
Timber.d("initializeMqttClient")
try {
mqttOptions?.let {mqttOptions ->
mqttOptions?.let { mqttOptions ->
mqttClient = MqttAndroidClient(context, mqttOptions.brokerUrl, mqttOptions.getClientId(), MemoryPersistence())

mqttClient?.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
Timber.d("connect to broker completed, reconnected: $reconnect")
subscribeToTopics(mqttOptions.getStateTopics())
}
override fun connectionLost(cause: Throwable?) {}
Expand All @@ -168,6 +180,12 @@ class MQTTService(private var context: Context, options: MQTTOptions,
options.password = mqttOptions.getPassword().toCharArray()
}

val isConnected = mqttClient?.isConnected?:false
if(isConnected) {
mReady.set(true)
return
}

try {
mqttClient?.connect(options, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
Expand All @@ -176,80 +194,82 @@ class MQTTService(private var context: Context, options: MQTTOptions,
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
try {
mqttClient?.setBufferOpts(disconnectedBufferOptions)
listener?.handleMqttConnected()
mReady.set(true)
} catch (e: NullPointerException) {
listener?.handleMqttException("Error establishing MQTT Client, was null at the time of initialization.")
mReady.set(false)
}
mqttClient?.setBufferOpts(disconnectedBufferOptions)
listener?.handleMqttConnected()
mReady.set(true)
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
//Timber.e("Failed to connect to: " + mqttOptions.brokerUrl + " exception: " + exception)
listener?.handleMqttException("Error establishing MQTT connection to MQTT broker with address ${mqttOptions.brokerUrl}.")
mReady.set(false)
if(exception is MqttException) {
if(exception.reasonCode == 32100 || exception.reasonCode == 32110) {
listener?.handleMqttConnected()
mReady.set(true)
return // we have a connection established or is establishing
}
}
mqttOptions.let {
Timber.e("Failed to connect to: " + it.brokerUrl + " exception: " + exception)
listener?.handleMqttException("Error establishing MQTT connection to MQTT broker with address ${mqttOptions.brokerUrl}.")
}
}
})
} catch (e: NullPointerException) {
Timber.e(e, e.message)
mReady.set(false)
} catch (e: MqttException) {
mReady.set(false)
listener?.handleMqttException("Error establishing MQTT connection to MQTT broker with address ${mqttOptions?.brokerUrl}.")
listener?.handleMqttException("" + e.message)
}

}
} catch (e: IllegalArgumentException) {
e.printStackTrace()
} catch (e: NullPointerException) {
e.printStackTrace()
} catch (e: Exception) {
e.printStackTrace()
listener?.handleMqttException("Error establishing MQTT connection to MQTT broker with address ${mqttOptions?.brokerUrl}.")
listener?.handleMqttException("" + e.message)
}
}

@Throws(MqttException::class)
private fun sendMessage(mqttTopic: String?, mqttMessage: MqttMessage) {
Timber.d("sendMessage")
try {
mqttClient?.let {
if (isReady && it.isConnected) {
mqttClient?.let {
if (isReady && it.isConnected) {
try {
it.publish(mqttTopic, mqttMessage)
Timber.d("Command Topic: $mqttTopic Payload: $message")
} catch (e: NullPointerException) {
Timber.e(e.message)
} catch (e: MqttException) {
Timber.e("Error Sending Command: " + e.message)
e.printStackTrace()
listener?.handleMqttException("Couldn't send message to the MQTT broker for topic $mqttTopic, check the MQTT client settings or your connection to the broker.")

}
}
} catch (e: NullPointerException) {
Timber.e(e.message)
} catch (e: MqttException) {
Timber.e("Error Sending Command: " + e.message)
listener?.handleMqttException("Couldn't send message to the MQTT broker for topic $mqttTopic, check the MQTT client settings or your connection to the broker.")
}
}

private fun subscribeToTopics(topicFilters: Array<String>?) {
Timber.d("Subscribe to Topics: " + StringUtils.convertArrayToString(topicFilters))
try {
topicFilters?.let {
Timber.d("Subscribe to Topics: " + StringUtils.convertArrayToString(topicFilters))
mqttClient?.let {
if(it.isConnected && isReady) {
it.subscribe(topicFilters, MqttUtils.getQos(topicFilters!!.size),
MqttUtils.getMqttMessageListeners(topicFilters.size, listener))
if (isReady) {
try {
it.subscribe(topicFilters, MqttUtils.getQos(topicFilters.size), MqttUtils.getMqttMessageListeners(topicFilters.size, listener))
} catch (e: NullPointerException) {
e.printStackTrace()
Timber.e(e.message)
} catch (e: MqttException) {
e.printStackTrace()
listener?.handleMqttException("Exception while subscribing: " + e.message)
}
}
}
} catch (e: NullPointerException) {
if(!BuildConfig.DEBUG) {
Crashlytics.logException(e)
}
} catch (e: MqttException) {
if(!BuildConfig.DEBUG) {
Crashlytics.logException(e)
}
listener?.handleMqttException("Exception while subscribing to topics [${topicFilters.toString()}], the connection to the MQTT broker could have been disconnected.")
}
}

companion object {
// TODO make this optional
private val SHOULD_RETAIN = false
private val MQTT_QOS = 0
}
}
Loading

0 comments on commit eb5a62b

Please sign in to comment.