Skip to content

Commit

Permalink
fix(mqtt): ignore user requested stops in mqtt 5 client (#1511)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored and junfuchen99 committed Aug 3, 2023
1 parent 57cc468 commit 2fd7645
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn on
public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) {
int errorCode = onDisconnectionReturn.getErrorCode();
DisconnectPacket packet = onDisconnectionReturn.getDisconnectPacket();
// Error code 0 means that the disconnection was intentional. We do not need to run callbacks when we
// purposely interrupt a connection.
if (errorCode == 0 || packet != null && packet.getReasonCode()
// Error AWS_ERROR_MQTT5_USER_REQUESTED_STOP means that the disconnection was intentional.
// We do not need to run callbacks when we purposely interrupt a connection.
if ("AWS_ERROR_MQTT5_USER_REQUESTED_STOP".equals(CRT.awsErrorName(errorCode))
|| packet != null && packet.getReasonCode()
.equals(DisconnectPacket.DisconnectReasonCode.NORMAL_DISCONNECTION)) {
logger.atInfo().log("Connection purposefully interrupted");
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.mqttclient;

import com.aws.greengrass.config.Topics;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents;
import software.amazon.awssdk.crt.mqtt5.OnDisconnectionReturn;
import software.amazon.awssdk.iot.AwsIotMqtt5ClientBuilder;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("PMD.CloseResource")
@ExtendWith({GGExtension.class, MockitoExtension.class})
public class AwsIotMqtt5ClientTest {
@Mock
AwsIotMqtt5ClientBuilder builder;

@Mock
MqttClientConnectionEvents mockCallback1;

@Mock
MqttClientConnectionEvents mockCallback2;

CallbackEventManager callbackEventManager;
Topics mockTopic;

// same as what we use in Kernel
private ExecutorService executorService;
private ScheduledExecutorService ses;

@BeforeEach
void beforeEach() {
callbackEventManager = spy(new CallbackEventManager());
callbackEventManager.addToCallbackEvents(mockCallback1);
callbackEventManager.addToCallbackEvents(mockCallback2);
mockTopic = mock(Topics.class);
executorService = Executors.newCachedThreadPool();
ses = new ScheduledThreadPoolExecutor(4);
}

@Test
void GIVEN_multiple_callbacks_in_callbackEventManager_WHEN_connections_are_interrupted_purposely_THEN_no_callbacks_are_called() {
AwsIotMqtt5Client client1 = new AwsIotMqtt5Client(() -> builder, (x) -> null, "A", 0, mockTopic,
callbackEventManager, executorService, ses);
client1.disableRateLimiting();
AwsIotMqtt5Client client2 = new AwsIotMqtt5Client(() -> builder, (x) -> null, "B", 0, mockTopic,
callbackEventManager, executorService, ses);
client2.disableRateLimiting();
callbackEventManager.runOnConnectionResumed(false);
assertTrue(callbackEventManager.hasCallbacked());
int errorCode = 5153;

OnDisconnectionReturn disconnectEvent = mock(OnDisconnectionReturn.class);
when(disconnectEvent.getErrorCode()).thenReturn(errorCode);
client1.getConnectionEventCallback().onDisconnection(null, disconnectEvent);
verify(callbackEventManager, never()).runOnConnectionInterrupted(anyInt());
verify(mockCallback1, never()).onConnectionInterrupted(anyInt());
verify(mockCallback2, never()).onConnectionInterrupted(anyInt());

client2.getConnectionEventCallback().onDisconnection(null, disconnectEvent);
verify(callbackEventManager, never()).runOnConnectionInterrupted(anyInt());
verify(mockCallback1, never()).onConnectionInterrupted(anyInt());
verify(mockCallback2, never()).onConnectionInterrupted(anyInt());

assertTrue(callbackEventManager.hasCallbacked());
}
}

0 comments on commit 2fd7645

Please sign in to comment.