From 00eaed4906a1bb9611ccfe059f6bf4045ea3f201 Mon Sep 17 00:00:00 2001 From: Junfu Chen Date: Fri, 13 May 2022 14:08:45 -0700 Subject: [PATCH] Revert "fix: reconfigure MQTT client better when things change (#1197)" This reverts commit cddcc5c167db29975c55b60c6f16f38a7fca16f7. --- .../telemetry/TelemetryAgentTest.java | 3 - .../aws/greengrass/mqttclient/MqttClient.java | 148 ++++++++---------- .../greengrass/telemetry/TelemetryAgent.java | 28 ++-- .../greengrass/util/BatchedSubscriber.java | 17 +- .../telemetry/TelemetryAgentTest.java | 10 +- 5 files changed, 83 insertions(+), 123 deletions(-) diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/TelemetryAgentTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/TelemetryAgentTest.java index 914ec226fd..f0789a81a4 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/TelemetryAgentTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/TelemetryAgentTest.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -57,7 +56,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.atLeast; @@ -91,7 +89,6 @@ void before() { // handlers here TestFeatureParameters.clearHandlerCallbacks(); TestFeatureParameters.internalEnableTestingFeatureParameters(DEFAULT_HANDLER); - lenient().when(mqttClient.publish(any())).thenReturn(CompletableFuture.completedFuture(0)); } @AfterEach diff --git a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java index 6195285851..988c341740 100644 --- a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java +++ b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java @@ -15,7 +15,6 @@ import com.aws.greengrass.mqttclient.spool.SpoolerStoreException; import com.aws.greengrass.security.SecurityService; import com.aws.greengrass.security.exceptions.MqttConnectionProviderException; -import com.aws.greengrass.util.BatchedSubscriber; import com.aws.greengrass.util.Coerce; import com.aws.greengrass.util.LockScope; import com.aws.greengrass.util.ProxyUtils; @@ -39,7 +38,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -118,7 +116,6 @@ public class MqttClient implements Closeable { private final AtomicInteger connectionRoundRobin = new AtomicInteger(0); @Getter private final AtomicBoolean mqttOnline = new AtomicBoolean(false); - private final Object httpProxyLock = new Object(); private final EventLoopGroup eventLoopGroup; private final HostResolver hostResolver; @@ -127,9 +124,8 @@ public class MqttClient implements Closeable { private final Spool spool; private final ExecutorService executorService; - private TlsContextOptions proxyTlsOptions; - private ClientTlsContext proxyTlsContext; - private String rootCaPath; + private final TlsContextOptions proxyTlsOptions; + private final ClientTlsContext proxyTlsContext; private ScheduledExecutorService ses; private final AtomicReference> spoolingFuture = new AtomicReference<>(); @@ -193,12 +189,9 @@ public MqttClient(DeviceConfiguration deviceConfiguration, ScheduledExecutorServ Coerce.toInt(mqttTopics.findOrDefault(DEFAULT_MQTT_PING_TIMEOUT, MQTT_PING_TIMEOUT_KEY))) .withSocketOptions(new SocketOptions()).withTimeoutMs(Coerce.toInt( mqttTopics.findOrDefault(DEFAULT_MQTT_SOCKET_TIMEOUT, MQTT_SOCKET_TIMEOUT_KEY))); - synchronized (httpProxyLock) { - HttpProxyOptions httpProxyOptions = - ProxyUtils.getHttpProxyOptions(deviceConfiguration, proxyTlsContext); - if (httpProxyOptions != null) { - builder.withHttpProxyOptions(httpProxyOptions); - } + HttpProxyOptions httpProxyOptions = ProxyUtils.getHttpProxyOptions(deviceConfiguration, proxyTlsContext); + if (httpProxyOptions != null) { + builder.withHttpProxyOptions(httpProxyOptions); } return builder; }; @@ -210,8 +203,8 @@ protected MqttClient(DeviceConfiguration deviceConfiguration, this.deviceConfiguration = deviceConfiguration; this.executorService = executorService; this.ses = ses; - rootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); - this.proxyTlsOptions = getTlsContextOptions(rootCaPath); + + this.proxyTlsOptions = getTlsContextOptions(deviceConfiguration); this.proxyTlsContext = new ClientTlsContext(proxyTlsOptions); mqttTopics = this.deviceConfiguration.getMQTTNamespace(); @@ -232,76 +225,67 @@ protected MqttClient(DeviceConfiguration deviceConfiguration, deviceConfiguration.getSpoolerNamespace(); deviceConfiguration.getAWSRegion(); + // Skip the reconnect logic below if device is running offline + if (!deviceConfiguration.isDeviceConfiguredToTalkToCloud()) { + return; + } + // If anything in the device configuration changes, then we will need to reconnect to the cloud // using the new settings. We do this by calling reconnect() on all of our connections - this.deviceConfiguration.onAnyChange(new BatchedSubscriber((what, node) -> { - // Skip events that don't change anything - if (WhatHappened.timestampUpdated.equals(what) || WhatHappened.interiorAdded.equals(what) || node == null) { - return true; - } - - // List of configuration nodes that we need to reconfigure for if they change - if (!(node.childOf(DEVICE_MQTT_NAMESPACE) || node.childOf(DEVICE_PARAM_THING_NAME) || node.childOf( - DEVICE_PARAM_IOT_DATA_ENDPOINT) || node.childOf(DEVICE_PARAM_PRIVATE_KEY_PATH) || node.childOf( - DEVICE_PARAM_CERTIFICATE_FILE_PATH) || node.childOf(DEVICE_PARAM_ROOT_CA_PATH) || node.childOf( - DEVICE_PARAM_AWS_REGION))) { - return true; + this.deviceConfiguration.onAnyChange((what, node) -> { + if (connections.isEmpty()) { + return; } + if (WhatHappened.childChanged.equals(what) && node != null) { + // List of configuration nodes that we need to reconfigure for if they change + if (!(node.childOf(DEVICE_MQTT_NAMESPACE) || node.childOf(DEVICE_PARAM_THING_NAME) || node + .childOf(DEVICE_PARAM_IOT_DATA_ENDPOINT) || node.childOf(DEVICE_PARAM_PRIVATE_KEY_PATH) || node + .childOf(DEVICE_PARAM_CERTIFICATE_FILE_PATH) || node.childOf(DEVICE_PARAM_ROOT_CA_PATH) || node + .childOf(DEVICE_PARAM_AWS_REGION))) { + return; + } - // Only reconnect when the region changed if the proxy exists - if (node.childOf(DEVICE_PARAM_AWS_REGION) && !ProxyUtils.isProxyConfigured(deviceConfiguration)) { - return true; - } + if (node.childOf(DEVICE_MQTT_NAMESPACE)) { + validateAndSetMqttPublishConfiguration(); + } - logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what) - .log("Reconfiguring MQTT clients"); - return false; - }, (what) -> { - validateAndSetMqttPublishConfiguration(); - - // Reconnect in separate thread to not block publish thread - // Schedule the reconnection for slightly in the future to de-dupe multiple changes - Future oldFuture = reconfigureFuture.getAndSet(ses.schedule(() -> { - // If the rootCa path changed, then we need to update the TLS options - String newRootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); - synchronized (httpProxyLock) { - if (!Objects.equals(rootCaPath, newRootCaPath)) { - if (proxyTlsOptions != null) { - proxyTlsOptions.close(); - } - if (proxyTlsContext != null) { - proxyTlsContext.close(); - } - rootCaPath = newRootCaPath; - proxyTlsOptions = getTlsContextOptions(rootCaPath); - proxyTlsContext = new ClientTlsContext(proxyTlsOptions); - } + // Only reconnect when the region changed if the proxy exists + if (node.childOf(DEVICE_PARAM_AWS_REGION) + && !ProxyUtils.isProxyConfigured(deviceConfiguration)) { + return; } - // Continually try to reconnect until all the connections are reconnected - Set brokenConnections = new CopyOnWriteArraySet<>(connections); - do { - for (AwsIotMqttClient connection : brokenConnections) { - if (Thread.currentThread().isInterrupted()) { - return; - } + logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what) + .log("Reconfiguring MQTT clients"); + + // Reconnect in separate thread to not block publish thread + // Schedule the reconnection for slightly in the future to de-dupe multiple changes + Future oldFuture = reconfigureFuture.getAndSet(ses.schedule(() -> { + // Continually try to reconnect until all the connections are reconnected + Set brokenConnections = new CopyOnWriteArraySet<>(connections); + do { + for (AwsIotMqttClient connection : brokenConnections) { + if (Thread.currentThread().isInterrupted()) { + return; + } - try { - connection.reconnect(); - brokenConnections.remove(connection); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - logger.atError().setCause(e).kv(CLIENT_ID_KEY, connection.getClientId()) - .log("Error while reconnecting MQTT client"); + try { + connection.reconnect(); + brokenConnections.remove(connection); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + logger.atError().setCause(e).kv(CLIENT_ID_KEY, connection.getClientId()) + .log("Error while reconnecting MQTT client"); + } } - } - } while (!brokenConnections.isEmpty()); - }, 1, TimeUnit.SECONDS)); + } while (!brokenConnections.isEmpty()); + }, 1, TimeUnit.SECONDS)); - // If a reconfiguration task already existed, then kill it and create a new one - if (oldFuture != null) { - oldFuture.cancel(true); + // If a reconfiguration task already existed, then kill it and create a new one + if (oldFuture != null) { + oldFuture.cancel(true); + } } - })); + }); } /** @@ -325,14 +309,16 @@ public MqttClient(DeviceConfiguration deviceConfiguration, Spool spool, boolean this.builderProvider = builderProvider; this.spool = spool; this.mqttOnline.set(mqttOnline); + this.builderProvider = builderProvider; this.executorService = executorService; - rootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); - this.proxyTlsOptions = getTlsContextOptions(rootCaPath); - this.proxyTlsContext = new ClientTlsContext(proxyTlsOptions); validateAndSetMqttPublishConfiguration(); + + this.proxyTlsOptions = getTlsContextOptions(deviceConfiguration); + this.proxyTlsContext = new ClientTlsContext(proxyTlsOptions); } - private TlsContextOptions getTlsContextOptions(String rootCaPath) { + private TlsContextOptions getTlsContextOptions(DeviceConfiguration deviceConfiguration) { + String rootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); return Utils.isNotEmpty(rootCaPath) ? TlsContextOptions.createDefaultClient().withCertificateAuthorityFromPath(null, rootCaPath) : TlsContextOptions.createDefaultClient(); @@ -793,12 +779,8 @@ public synchronized void close() { } connections.forEach(AwsIotMqttClient::close); - if (proxyTlsOptions != null) { - proxyTlsOptions.close(); - } - if (proxyTlsContext != null) { - proxyTlsContext.close(); - } + proxyTlsOptions.close(); + proxyTlsContext.close(); clientBootstrap.close(); hostResolver.close(); eventLoopGroup.close(); diff --git a/src/main/java/com/aws/greengrass/telemetry/TelemetryAgent.java b/src/main/java/com/aws/greengrass/telemetry/TelemetryAgent.java index acbb8468a0..389ceb25cf 100644 --- a/src/main/java/com/aws/greengrass/telemetry/TelemetryAgent.java +++ b/src/main/java/com/aws/greengrass/telemetry/TelemetryAgent.java @@ -246,25 +246,19 @@ void aggregatePeriodicMetrics() { /** * Helper for metrics uploader. Also used in tests. */ - @SuppressWarnings("PMD.AvoidCatchingThrowable") void publishPeriodicMetrics() { - try { - if (!isConnected.get()) { - logger.atDebug().log("Cannot publish the metrics. MQTT connection interrupted."); - return; - } - long timestamp = Instant.now().toEpochMilli(); - long lastPublish = Coerce.toLong(getPeriodicPublishTimeTopic()); - Map> metricsToPublishMap = - metricsAggregator.getMetricsToPublish(lastPublish, timestamp); - getPeriodicPublishTimeTopic().withValue(timestamp); - if (metricsToPublishMap != null && metricsToPublishMap.containsKey(timestamp)) { - publisher.publish(MetricsPayload.builder().build(), metricsToPublishMap.get(timestamp)); - logger.atInfo().event("telemetry-metrics-published").log("Telemetry metrics update published."); - } - } catch (Throwable t) { - logger.atWarn().log("Error collecting telemetry. Will retry.", t); + if (!isConnected.get()) { + logger.atDebug().log("Cannot publish the metrics. MQTT connection interrupted."); + return; } + long timestamp = Instant.now().toEpochMilli(); + long lastPublish = Coerce.toLong(getPeriodicPublishTimeTopic()); + Map> metricsToPublishMap = + metricsAggregator.getMetricsToPublish(lastPublish, timestamp); + getPeriodicPublishTimeTopic().withValue(timestamp); + // TODO: [P41214679] Do not publish if the metrics are empty. + publisher.publish(MetricsPayload.builder().build(), metricsToPublishMap.get(timestamp)); + logger.atInfo().event("telemetry-metrics-published").log("Telemetry metrics update published."); } private Topic getPeriodicPublishTimeTopic() { diff --git a/src/main/java/com/aws/greengrass/util/BatchedSubscriber.java b/src/main/java/com/aws/greengrass/util/BatchedSubscriber.java index 99cbc65ebb..1a21c7f7d1 100644 --- a/src/main/java/com/aws/greengrass/util/BatchedSubscriber.java +++ b/src/main/java/com/aws/greengrass/util/BatchedSubscriber.java @@ -96,17 +96,6 @@ public BatchedSubscriber(Topics topics, this((Node) topics, exclusions, callback); } - /** - * Constructs a new BatchedSubscriber. - * - * @param exclusions predicate for ignoring a subset topic(s) changes - * @param callback action to perform after a batch of changes and on initialization - */ - public BatchedSubscriber(BiPredicate exclusions, - Consumer callback) { - this((Node) null, exclusions, callback); - } - /** * Constructs a new BatchedSubscriber. * @@ -114,7 +103,7 @@ public BatchedSubscriber(BiPredicate exclusions, * @param exclusions predicate for ignoring a subset topic(s) changes * @param callback action to perform after a batch of changes and on initialization */ - private BatchedSubscriber(Node node, + private BatchedSubscriber(@NonNull Node node, BiPredicate exclusions, @NonNull Consumer callback) { this.node = node; @@ -138,9 +127,7 @@ public void subscribe() { * Unsubscribe from the topic(s). */ public void unsubscribe() { - if (node != null) { - node.remove(this); - } + node.remove(this); } @Override diff --git a/src/test/java/com/aws/greengrass/telemetry/TelemetryAgentTest.java b/src/test/java/com/aws/greengrass/telemetry/TelemetryAgentTest.java index d37a57860b..d1f4d25024 100644 --- a/src/test/java/com/aws/greengrass/telemetry/TelemetryAgentTest.java +++ b/src/test/java/com/aws/greengrass/telemetry/TelemetryAgentTest.java @@ -63,10 +63,10 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -125,6 +125,7 @@ void setup(ExtensionContext ec) { configurationTopics.createLeafChild("periodicPublishMetricsIntervalSeconds").withValue(300); lenient().when(mockDeviceConfiguration.getTelemetryConfigurationTopics()).thenReturn(configurationTopics); lenient().when(mockMqttClient.publish(any())).thenReturn(CompletableFuture.completedFuture(0)); + lenient().doNothing().when(mockMqttClient).addToCallbackEvents(mqttClientConnectionEventsArgumentCaptor.capture()); telemetryAgent = new TelemetryAgent(config, mockMqttClient, mockDeviceConfiguration, ma, sme, kme, ses, executorService, 3, 1); } @@ -134,6 +135,7 @@ void cleanUp() throws IOException, InterruptedException { TelemetryConfig.getInstance().closeContext(); telemetryAgent.shutdown(); context.waitForPublishQueueToClear(); + Thread.sleep(1000); ses.shutdownNow(); executorService.shutdownNow(); context.close(); @@ -234,17 +236,15 @@ void GIVEN_Telemetry_Agent_WHEN_mqtt_is_interrupted_THEN_aggregation_continues_b }); telemetryAgent.postInject(); - long timeoutMs = 10_000; + long timeoutMs = 10000; verify(mockMqttClient, timeout(timeoutMs).atLeastOnce()).publish(publishRequestArgumentCaptor.capture()); PublishRequest request = publishRequestArgumentCaptor.getValue(); assertEquals(QualityOfService.AT_LEAST_ONCE, request.getQos()); assertEquals("$aws/things/testThing/greengrass/health/json", request.getTopic()); - verify(mockMqttClient, timeout(timeoutMs).atLeastOnce()) - .addToCallbackEvents(mqttClientConnectionEventsArgumentCaptor.capture()); reset(mockMqttClient); mqttClientConnectionEventsArgumentCaptor.getValue().onConnectionInterrupted(500); //verify that nothing is published when mqtt is interrupted - verify(mockMqttClient, never()).publish(publishRequestArgumentCaptor.capture()); + verify(mockMqttClient, times(0)).publish(publishRequestArgumentCaptor.capture()); // aggregation is continued irrespective of the mqtt connection verify(ma, timeout(timeoutMs).atLeastOnce()).aggregateMetrics(anyLong(), anyLong()); }