diff --git a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java index bc73038654..69b039891d 100644 --- a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java +++ b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java @@ -286,76 +286,80 @@ protected MqttClient(DeviceConfiguration deviceConfiguration, deviceConfiguration.getSpoolerNamespace(); deviceConfiguration.getAWSRegion(); - // If anything in the device configuration changes, then we will need to reconnect to the cloud - // using the new settings. We do this by calling reconnect() on all of our connections - this.deviceConfiguration.onAnyChange(new BatchedSubscriber((what, node) -> { - // Skip events that don't change anything - if (WhatHappened.timestampUpdated.equals(what) || WhatHappened.interiorAdded.equals(what) || node == null) { - return true; - } + // Setup change subscriber from the publish queue so that all pending events are cleared before we subscribe + mqttTopics.context.runOnPublishQueue(() -> { + // If anything in the device configuration changes, then we will need to reconnect to the cloud + // using the new settings. We do this by calling reconnect() on all of our connections + this.deviceConfiguration.onAnyChange(new BatchedSubscriber((what, node) -> { + // Skip events that don't change anything + if (WhatHappened.timestampUpdated.equals(what) || WhatHappened.interiorAdded.equals(what) + || node == null) { + return true; + } - // List of configuration nodes that we need to reconfigure for if they change - if (!(node.childOf(DEVICE_MQTT_NAMESPACE) || node.childOf(DEVICE_PARAM_THING_NAME) || node.childOf( - DEVICE_PARAM_IOT_DATA_ENDPOINT) || node.childOf(DEVICE_PARAM_PRIVATE_KEY_PATH) || node.childOf( - DEVICE_PARAM_CERTIFICATE_FILE_PATH) || node.childOf(DEVICE_PARAM_ROOT_CA_PATH) || node.childOf( - DEVICE_PARAM_AWS_REGION))) { - return true; - } + // List of configuration nodes that we need to reconfigure for if they change + if (!(node.childOf(DEVICE_MQTT_NAMESPACE) || node.childOf(DEVICE_PARAM_THING_NAME) || node.childOf( + DEVICE_PARAM_IOT_DATA_ENDPOINT) || node.childOf(DEVICE_PARAM_PRIVATE_KEY_PATH) || node.childOf( + DEVICE_PARAM_CERTIFICATE_FILE_PATH) || node.childOf(DEVICE_PARAM_ROOT_CA_PATH) || node.childOf( + DEVICE_PARAM_AWS_REGION))) { + return true; + } - // Only reconnect when the region changed if the proxy exists - if (node.childOf(DEVICE_PARAM_AWS_REGION) && !ProxyUtils.isProxyConfigured(deviceConfiguration)) { - return true; - } + // Only reconnect when the region changed if the proxy exists + if (node.childOf(DEVICE_PARAM_AWS_REGION) && !ProxyUtils.isProxyConfigured(deviceConfiguration)) { + return true; + } - logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what) - .log("Reconfiguring MQTT clients"); - return false; - }, (what) -> { - validateAndSetMqttPublishConfiguration(); - - // Reconnect in separate thread to not block publish thread - // Schedule the reconnection for slightly in the future to de-dupe multiple changes - Future oldFuture = reconfigureFuture.getAndSet(ses.schedule(() -> { - // If the rootCa path changed, then we need to update the TLS options - String newRootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); - synchronized (httpProxyLock) { - if (!Objects.equals(rootCaPath, newRootCaPath)) { - if (proxyTlsOptions != null) { - proxyTlsOptions.close(); - } - if (proxyTlsContext != null) { - proxyTlsContext.close(); + logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what) + .log("Reconfiguring MQTT clients"); + return false; + }, (what) -> { + validateAndSetMqttPublishConfiguration(); + + // Reconnect in separate thread to not block publish thread + // Schedule the reconnection for slightly in the future to de-dupe multiple changes + Future oldFuture = reconfigureFuture.getAndSet(ses.schedule(() -> { + // If the rootCa path changed, then we need to update the TLS options + String newRootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); + synchronized (httpProxyLock) { + if (!Objects.equals(rootCaPath, newRootCaPath)) { + if (proxyTlsOptions != null) { + proxyTlsOptions.close(); + } + if (proxyTlsContext != null) { + proxyTlsContext.close(); + } + rootCaPath = newRootCaPath; + proxyTlsOptions = getTlsContextOptions(rootCaPath); + proxyTlsContext = new ClientTlsContext(proxyTlsOptions); } - rootCaPath = newRootCaPath; - proxyTlsOptions = getTlsContextOptions(rootCaPath); - proxyTlsContext = new ClientTlsContext(proxyTlsOptions); } - } - // Continually try to reconnect until all the connections are reconnected - Set brokenConnections = new CopyOnWriteArraySet<>(connections); - do { - for (IndividualMqttClient connection : brokenConnections) { - if (Thread.currentThread().isInterrupted()) { - return; - } + // Continually try to reconnect until all the connections are reconnected + Set brokenConnections = new CopyOnWriteArraySet<>(connections); + do { + for (IndividualMqttClient connection : brokenConnections) { + if (Thread.currentThread().isInterrupted()) { + return; + } - try { - connection.reconnect(getMqttOperationTimeoutMillis()); - brokenConnections.remove(connection); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.atError().setCause(e).kv(CLIENT_ID_KEY, connection.getClientId()) - .log("Error while reconnecting MQTT client"); + try { + connection.reconnect(getMqttOperationTimeoutMillis()); + brokenConnections.remove(connection); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + logger.atError().setCause(e).kv(CLIENT_ID_KEY, connection.getClientId()) + .log("Error while reconnecting MQTT client"); + } } - } - } while (!brokenConnections.isEmpty()); - }, 1, TimeUnit.SECONDS)); + } while (!brokenConnections.isEmpty()); + }, 1, TimeUnit.SECONDS)); - // If a reconfiguration task already existed, then kill it and create a new one - if (oldFuture != null) { - oldFuture.cancel(true); - } - })); + // If a reconfiguration task already existed, then kill it and create a new one + if (oldFuture != null) { + oldFuture.cancel(true); + } + })); + }); } /** diff --git a/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java b/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java index 9b6984d3c8..1526e8c34c 100644 --- a/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java +++ b/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java @@ -299,6 +299,7 @@ void GIVEN_connection_WHEN_settings_change_THEN_reconnects_on_valid_changes() ArgumentCaptor cc = ArgumentCaptor.forClass(ChildChanged.class); doNothing().when(deviceConfiguration).onAnyChange(cc.capture()); MqttClient client = spy(new MqttClient(deviceConfiguration, (c) -> builder, ses, executorService)); + mqttNamespace.context.waitForPublishQueueToClear(); AwsIotMqttClient iClient1 = mock(AwsIotMqttClient.class); when(iClient1.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null));