diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 157eb33..d6704f2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,3 +31,22 @@ jobs: pip install mock python3 -m pytest test + integration-tests: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + test-type: [ MutualAuth, MutualAuthT , Websocket, ALPN, ALPNT] + python-version: [ '2.x', '3.x' ] + #[MutualAuth, Websocket, ALPN] + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Integration tests + run: | + pip install pytest + pip install mock + pip install boto3 + ./test-integration/run/run.sh ${{ matrix.test-type }} 1000 100 7 \ No newline at end of file diff --git a/test-integration/Credentials/.gitignore b/test-integration/Credentials/.gitignore new file mode 100644 index 0000000..94548af --- /dev/null +++ b/test-integration/Credentials/.gitignore @@ -0,0 +1,3 @@ +* +*/ +!.gitignore diff --git a/test-integration/IntegrationTests/IntegrationTestAsyncAPIGeneralNotificationCallbacks.py b/test-integration/IntegrationTests/IntegrationTestAsyncAPIGeneralNotificationCallbacks.py new file mode 100644 index 0000000..b69fdcd --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestAsyncAPIGeneralNotificationCallbacks.py @@ -0,0 +1,159 @@ +# This integration test verifies the functionality of asynchronous API for plain MQTT operations, as well as general +# notification callbacks. There are 2 phases for this test: +# a) Testing async APIs + onMessage general notification callback +# b) Testing onOnline, onOffline notification callbacks +# To achieve test goal a) and b), the client will follow the routine described below: +# 1. Client does async connect to AWS IoT and captures the CONNACK event and onOnline callback event in the record +# 2. Client does async subscribe to a topic and captures the SUBACK event in the record +# 3. Client does several async publish (QoS1) to the same topic and captures the PUBACK event in the record +# 4. Since client subscribes and publishes to the same topic, onMessage callback should be triggered. We capture these +# events as well in the record. +# 5. Client does async disconnect. This would trigger the offline callback and disconnect event callback. We capture +# them in the record. +# We should be able to receive all ACKs for all operations and corresponding general notification callback triggering +# events. + + +import random +import string +import time +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from TestToolLibrary.checkInManager import checkInManager +from TestToolLibrary.MQTTClientManager import MQTTClientManager +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + + +TOPIC = "topic/test/async_cb/" +MESSAGE_PREFIX = "MagicMessage-" +NUMBER_OF_PUBLISHES = 3 +HOST = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +ROOT_CA = "./test-integration/Credentials/rootCA.crt" +CERT = "./test-integration/Credentials/certificate.pem.crt" +KEY = "./test-integration/Credentials/privateKey.pem.key" +CLIENT_ID = "PySdkIntegTest_AsyncAPI_Callbacks" + +KEY_ON_ONLINE = "OnOnline" +KEY_ON_OFFLINE = "OnOffline" +KEY_ON_MESSAGE = "OnMessage" +KEY_CONNACK = "Connack" +KEY_DISCONNECT = "Disconnect" +KEY_PUBACK = "Puback" +KEY_SUBACK = "Suback" +KEY_UNSUBACK = "Unsuback" + + +class CallbackManager(object): + + def __init__(self): + self.callback_invocation_record = { + KEY_ON_ONLINE : 0, + KEY_ON_OFFLINE : 0, + KEY_ON_MESSAGE : 0, + KEY_CONNACK : 0, + KEY_DISCONNECT : 0, + KEY_PUBACK : 0, + KEY_SUBACK : 0, + KEY_UNSUBACK : 0 + } + + def on_online(self): + print("OMG, I am online!") + self.callback_invocation_record[KEY_ON_ONLINE] += 1 + + def on_offline(self): + print("OMG, I am offline!") + self.callback_invocation_record[KEY_ON_OFFLINE] += 1 + + def on_message(self, message): + print("OMG, I got a message!") + self.callback_invocation_record[KEY_ON_MESSAGE] += 1 + + def connack(self, mid, data): + print("OMG, I got a connack!") + self.callback_invocation_record[KEY_CONNACK] += 1 + + def disconnect(self, mid, data): + print("OMG, I got a disconnect!") + self.callback_invocation_record[KEY_DISCONNECT] += 1 + + def puback(self, mid): + print("OMG, I got a puback!") + self.callback_invocation_record[KEY_PUBACK] += 1 + + def suback(self, mid, data): + print("OMG, I got a suback!") + self.callback_invocation_record[KEY_SUBACK] += 1 + + def unsuback(self, mid): + print("OMG, I got an unsuback!") + self.callback_invocation_record[KEY_UNSUBACK] += 1 + + +def get_random_string(length): + return "".join(random.choice(string.ascii_lowercase) for i in range(length)) + + +############################################################################ +# Main # +# Check inputs +my_check_in_manager = checkInManager(1) +my_check_in_manager.verify(sys.argv) +mode = my_check_in_manager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Performing +############ +print("Connecting...") +callback_manager = CallbackManager() +sdk_mqtt_client = MQTTClientManager()\ + .create_nonconnected_mqtt_client(mode, CLIENT_ID, HOST, (ROOT_CA, CERT, KEY), callback_manager) +sdk_mqtt_client.connectAsync(keepAliveIntervalSecond=1, ackCallback=callback_manager.connack) # Add callback +print("Wait some time to make sure we are connected...") +time.sleep(10) # 10 sec + +topic = TOPIC + get_random_string(4) +print("Subscribing to topic: " + topic) +sdk_mqtt_client.subscribeAsync(topic, 1, ackCallback=callback_manager.suback, messageCallback=None) +print("Wait some time to make sure we are subscribed...") +time.sleep(3) # 3 sec + +print("Publishing...") +for i in range(NUMBER_OF_PUBLISHES): + sdk_mqtt_client.publishAsync(topic, MESSAGE_PREFIX + str(i), 1, ackCallback=callback_manager.puback) + time.sleep(1) +print("Wait sometime to make sure we finished with publishing...") +time.sleep(2) + +print("Unsubscribing...") +sdk_mqtt_client.unsubscribeAsync(topic, ackCallback=callback_manager.unsuback) +print("Wait sometime to make sure we finished with unsubscribing...") +time.sleep(2) + +print("Disconnecting...") +sdk_mqtt_client.disconnectAsync(ackCallback=callback_manager.disconnect) + +print("Wait sometime to let the test result sync...") +time.sleep(3) + +print("Verifying...") +try: + assert callback_manager.callback_invocation_record[KEY_ON_ONLINE] == 1 + assert callback_manager.callback_invocation_record[KEY_CONNACK] == 1 + assert callback_manager.callback_invocation_record[KEY_SUBACK] == 1 + assert callback_manager.callback_invocation_record[KEY_PUBACK] == NUMBER_OF_PUBLISHES + assert callback_manager.callback_invocation_record[KEY_ON_MESSAGE] == NUMBER_OF_PUBLISHES + assert callback_manager.callback_invocation_record[KEY_UNSUBACK] == 1 + assert callback_manager.callback_invocation_record[KEY_DISCONNECT] == 1 + assert callback_manager.callback_invocation_record[KEY_ON_OFFLINE] == 1 +except BaseException as e: + print("Failed! %s" % e.message) +print("Pass!") diff --git a/test-integration/IntegrationTests/IntegrationTestAutoReconnectResubscribe.py b/test-integration/IntegrationTests/IntegrationTestAutoReconnectResubscribe.py new file mode 100644 index 0000000..83b66c2 --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestAutoReconnectResubscribe.py @@ -0,0 +1,202 @@ +# This integration test verifies the functionality in the Python core of Yun/Python SDK +# for auto-reconnect and auto-resubscribe. +# It starts two threads using two different connections to AWS IoT: +# Thread A publishes 10 messages to topicB first, then quiet for a while, and finally +# publishes another 10 messages to topicB. +# Thread B subscribes to topicB and waits to receive messages. Once it receives the first +# 10 messages. It simulates a network error, disconnecting from the broker. In a short time, +# it should automatically reconnect and resubscribe to the previous topic and be able to +# receive the next 10 messages from thread A. +# Because of auto-reconnect/resubscribe, thread B should be able to receive all of the +# messages from topicB published by thread A without calling subscribe again in user code +# explicitly. + + +import random +import string +import sys +import time +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +import TestToolLibrary.checkInManager as checkInManager +import TestToolLibrary.MQTTClientManager as MQTTClientManager +from TestToolLibrary import simpleThreadManager +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + +CLIENT_ID_PUB = "integrationTestMQTT_ClientPub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) +CLIENT_ID_SUB = "integrationTestMQTT_ClientSub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + +# Callback unit +class callbackUnit: + def __init__(self): + self._internalSet = set() + + # Callback fro clientSub + def messageCallback(self, client, userdata, message): + print("Received a new message: " + str(message.payload)) + self._internalSet.add(message.payload.decode('utf-8')) + + def getInternalSet(self): + return self._internalSet + + +# Simulate a network error +def manualNetworkError(srcPyMQTTCore): + # Ensure we close the socket + if srcPyMQTTCore._internal_async_client._paho_client._sock: + srcPyMQTTCore._internal_async_client._paho_client._sock.close() + srcPyMQTTCore._internal_async_client._paho_client._sock = None + if srcPyMQTTCore._internal_async_client._paho_client._ssl: + srcPyMQTTCore._internal_async_client._paho_client._ssl.close() + srcPyMQTTCore._internal_async_client._paho_client._ssl = None + # Fake that we have detected the disconnection + srcPyMQTTCore._internal_async_client._paho_client.on_disconnect(None, None, 0) + + +# runFunctionUnit +class runFunctionUnit(): + def __init__(self): + self._messagesPublished = set() + self._topicB = "topicB/" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + + # ThreadA runtime function: + # 1. Publish 10 messages to topicB. + # 2. Take a nap: 20 sec + # 3. Publish another 10 messages to topicB. + def threadARuntime(self, pyCoreClient): + time.sleep(3) # Ensure a valid subscription + messageCount = 0 + # First 10 messages + while messageCount < 10: + try: + pyCoreClient.publish(self._topicB, str(messageCount), 1, False) + self._messagesPublished.add(str(messageCount)) + except publishError: + print("Publish error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + messageCount += 1 + time.sleep(0.5) # TPS = 2 + # Take a nap + time.sleep(20) + # Second 10 messages + while messageCount < 20: + try: + pyCoreClient.publish(self._topicB, str(messageCount), 1, False) + self._messagesPublished.add(str(messageCount)) + except publishError: + print("Publish Error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + messageCount += 1 + time.sleep(0.5) + print("Publish thread terminated.") + + # ThreadB runtime function: + # 1. Subscribe to topicB + # 2. Wait for a while + # 3. Create network blocking, triggering auto-reconnect and auto-resubscribe + # 4. On connect, wait for another while + def threadBRuntime(self, pyCoreClient, callback): + try: + # Subscribe to topicB + pyCoreClient.subscribe(self._topicB, 1, callback) + except subscribeTimeoutException: + print("Subscribe timeout!") + except subscribeError: + print("Subscribe error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + # Wait to get the first 10 messages from thread A + time.sleep(10) + # Block the network for 3 sec + print("Block the network for 3 sec...") + blockingTimeTenMs = 300 + while blockingTimeTenMs != 0: + manualNetworkError(pyCoreClient) + blockingTimeTenMs -= 1 + time.sleep(0.01) + print("Leave it to the main thread to keep waiting...") + + +############################################################################ +# Main # +# Check inputs +myCheckInManager = checkInManager.checkInManager(1) +myCheckInManager.verify(sys.argv) + +host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +rootCA = "./test-integration/Credentials/rootCA.crt" +certificate = "./test-integration/Credentials/certificate.pem.crt" +privateKey = "./test-integration/Credentials/privateKey.pem.key" +mode = myCheckInManager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Init Python core and connect +myMQTTClientManager = MQTTClientManager.MQTTClientManager() +clientPub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_PUB, host, rootCA, + certificate, privateKey, mode=mode) +clientSub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_SUB, host, rootCA, + certificate, privateKey, mode=mode) + +if clientPub is None or clientSub is None: + print("Clients not init!") + exit(4) + +print("Two clients are connected!") + +# Configurations +################ +# Callback unit +subCallbackUnit = callbackUnit() +# Threads +mySimpleThreadManager = simpleThreadManager.simpleThreadManager() +myRunFunctionUnit = runFunctionUnit() +publishThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnit.threadARuntime, [clientPub]) +subscribeThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnit.threadBRuntime, + [clientSub, subCallbackUnit.messageCallback]) + +# Performing +############ +mySimpleThreadManager.startThreadWithID(subscribeThreadID) +mySimpleThreadManager.startThreadWithID(publishThreadID) +mySimpleThreadManager.joinOneTimeThreadWithID(subscribeThreadID) +mySimpleThreadManager.joinOneTimeThreadWithID(publishThreadID) +time.sleep(3) # Just in case messages arrive slowly + +# Verifying +########### +# Length +print("Check if the length of the two sets are equal...") +print("Received from subscription: " + str(len(subCallbackUnit.getInternalSet()))) +print("Sent through publishes: " + str(len(myRunFunctionUnit._messagesPublished))) +if len(myRunFunctionUnit._messagesPublished) != len(subCallbackUnit.getInternalSet()): + print("Number of messages not equal!") + exit(4) +# Content +print("Check if the content if the two sets are equivalent...") +if myRunFunctionUnit._messagesPublished != subCallbackUnit.getInternalSet(): + print("Sent through publishes:") + print(myRunFunctionUnit._messagesPublished) + print("Received from subscription:") + print(subCallbackUnit.getInternalSet()) + print("Set content not equal!") + exit(4) +else: + print("Yes!") diff --git a/test-integration/IntegrationTests/IntegrationTestClientReusability.py b/test-integration/IntegrationTests/IntegrationTestClientReusability.py new file mode 100644 index 0000000..f747e19 --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestClientReusability.py @@ -0,0 +1,128 @@ +# This integration test verifies the re-usability of SDK MQTT client. +# By saying re-usability, we mean that users should be able to reuse +# the same SDK MQTT client object to connect and invoke other APIs +# after a disconnect API call has been invoked on that client object. +# This test contains 2 clients living 2 separate threads: +# 1. Thread publish: In this thread, a MQTT client will do the following +# in a loop: +# a. Connect to AWS IoT +# b. Publish several messages to a dedicated topic +# c. Disconnect from AWS IoT +# d. Sleep for a while +# 2. Thread subscribe: In this thread, a MQTT client will do nothing +# other than subscribing to a dedicated topic and counting the incoming +# messages. +# Assuming the client is reusable, the subscriber should be able to +# receive all the messages published by the publisher from the same +# client object in different connect sessions. + + +import uuid +import time +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from threading import Event +from TestToolLibrary.checkInManager import checkInManager +from TestToolLibrary.simpleThreadManager import simpleThreadManager +from TestToolLibrary.MQTTClientManager import MQTTClientManager +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + + +TOPIC = "topic/" + str(uuid.uuid1()) +CLIENT_ID_PUB = "publisher" + str(uuid.uuid1()) +CLIENT_ID_SUB = "subscriber" + str(uuid.uuid1()) +MESSAGE_PREFIX = "Message-" +NUMBER_OF_MESSAGES_PER_LOOP = 3 +NUMBER_OF_LOOPS = 3 +SUB_WAIT_TIME_OUT_SEC = 20 +HOST = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +ROOT_CA = "./test-integration/Credentials/rootCA.crt" +CERT = "./test-integration/Credentials/certificate.pem.crt" +KEY = "./test-integration/Credentials/privateKey.pem.key" + + +class ClientTwins(object): + + def __init__(self, client_pub, client_sub): + self._client_pub = client_pub + self._client_sub = client_sub + self._message_publish_set = set() + self._message_receive_set = set() + self._publish_done = Event() + + def run_publisher(self, *params): + self._publish_done.clear() + time.sleep(3) + for i in range(NUMBER_OF_LOOPS): + self._single_publish_loop(i) + time.sleep(2) + self._publish_done.set() + + def _single_publish_loop(self, iteration_count): + print("In loop %d: " % iteration_count) + self._client_pub.connect() + print("Publisher connected!") + for i in range(NUMBER_OF_MESSAGES_PER_LOOP): + message = MESSAGE_PREFIX + str(iteration_count) + "_" + str(i) + self._client_pub.publish(TOPIC, message, 1) + print("Publisher published %s to topic %s" % (message, TOPIC)) + self._message_publish_set.add(message.encode("utf-8")) + time.sleep(1) + self._client_pub.disconnect() + print("Publisher disconnected!\n\n") + + def run_subscriber(self, *params): + self._client_sub.connect() + self._client_sub.subscribe(TOPIC, 1, self._callback) + self._publish_done.wait(20) + self._client_sub.disconnect() + + def _callback(self, client, user_data, message): + self._message_receive_set.add(message.payload) + print("Subscriber received %s from topic %s" % (message.payload, message.topic)) + + def verify(self): + assert len(self._message_receive_set) != 0 + assert len(self._message_publish_set) != 0 + assert self._message_publish_set == self._message_receive_set + + +############################################################################ +# Main # +my_check_in_manager = checkInManager(1) +my_check_in_manager.verify(sys.argv) +mode = my_check_in_manager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +simple_thread_manager = simpleThreadManager() + +client_pub = MQTTClientManager().create_nonconnected_mqtt_client(mode, CLIENT_ID_PUB, HOST, (ROOT_CA, CERT, KEY)) +print("Client publisher initialized.") +client_sub = MQTTClientManager().create_nonconnected_mqtt_client(mode, CLIENT_ID_SUB, HOST, (ROOT_CA, CERT, KEY)) +print("Client subscriber initialized.") +client_twins = ClientTwins(client_pub, client_sub) +print("Client twins initialized.") + +publisher_thread_id = simple_thread_manager.createOneTimeThread(client_twins.run_publisher, []) +subscriber_thread_id = simple_thread_manager.createOneTimeThread(client_twins.run_subscriber, []) +simple_thread_manager.startThreadWithID(subscriber_thread_id) +print("Started subscriber thread.") +simple_thread_manager.startThreadWithID(publisher_thread_id) +print("Started publisher thread.") + +print("Main thread starts waiting.") +simple_thread_manager.joinOneTimeThreadWithID(publisher_thread_id) +simple_thread_manager.joinOneTimeThreadWithID(subscriber_thread_id) +print("Main thread waiting is done!") + +print("Verifying...") +client_twins.verify() +print("Pass!") diff --git a/test-integration/IntegrationTests/IntegrationTestConfigurablePublishMessageQueueing.py b/test-integration/IntegrationTests/IntegrationTestConfigurablePublishMessageQueueing.py new file mode 100644 index 0000000..d6bfdc5 --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestConfigurablePublishMessageQueueing.py @@ -0,0 +1,305 @@ +# This integration test verifies the functionality in the Python core of Yun SDK +# for configurable offline publish message queueing. +# For each offline publish queue to be tested, it starts two threads using +# different connections to AWS IoT: +# Thread A subscribes to TopicOnly and wait to receive messages published to +# TopicOnly from ThreadB. +# Thread B publishes to TopicOnly with a manual network error which triggers the +# offline publish message queueing. According to different configurations, the +# internal queue should keep as many publish requests as configured and then +# republish them once the connection is back. +# * After the network is down but before the client gets the notification of being +# * disconnected, QoS0 messages in between this "blind-window" will be lost. However, +# * once the client gets the notification, it should start queueing messages up to +# * its queue size limit. +# * Therefore, all published messages are QoS0, we are verifying the total amount. +# * Configuration to be tested: +# 1. Limited queueing section, limited response (in-flight) section, drop oldest +# 2. Limited queueing section, limited response (in-flight) section, drop newest + + +import threading +import sys +import time +import random +import string +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +import TestToolLibrary.checkInManager as checkInManager +import TestToolLibrary.MQTTClientManager as MQTTClientManager +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.util.enums import DropBehaviorTypes +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueFullException +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + +CLIENT_ID_PUB = "integrationTestMQTT_ClientPub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) +CLIENT_ID_SUB = "integrationTestMQTT_ClientSub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + +# Class that implements the publishing thread: Thread A, with network failure +# This thread will publish 3 messages first, and then keep publishing +# with a network failure, and then publish another set of 3 messages +# once the connection is resumed. +# * TPS = 1 +class threadPub: + def __init__(self, pyCoreClient, numberOfOfflinePublish, srcTopic): + self._publishMessagePool = list() + self._pyCoreClient = pyCoreClient + self._numberOfOfflinePublish = numberOfOfflinePublish + self._topic = srcTopic + + # Simulate a network error + def _manualNetworkError(self): + # Ensure we close the socket + if self._pyCoreClient._internal_async_client._paho_client._sock: + self._pyCoreClient._internal_async_client._paho_client._sock.close() + self._pyCoreClient._internal_async_client._paho_client._sock = None + if self._pyCoreClient._internal_async_client._paho_client._ssl: + self._pyCoreClient._internal_async_client._paho_client._ssl.close() + self._pyCoreClient._internal_async_client._paho_client._ssl = None + # Fake that we have detected the disconnection + self._pyCoreClient._internal_async_client._paho_client.on_disconnect(None, None, 0) + + def _runtime(self): + messageCount = 0 + # Publish 3 messages + print("Thread A: Publish 3 messages.") + step1PublishCount = 3 + while step1PublishCount != 0: + currentMessage = str(messageCount) + self._publishMessagePool.append(int(currentMessage)) + try: + self._pyCoreClient.publish(self._topic, currentMessage, 0, False) + print("Thread A: Published a message: " + str(currentMessage)) + step1PublishCount -= 1 + messageCount += 1 + except publishError: + print("Publish Error!") + except publishQueueFullException: + print("Internal Publish Queue is FULL!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + time.sleep(1) + # Network Failure, publish #numberOfOfflinePublish# messages + # Scanning rate = 100 TPS + print( + "Thread A: Simulate an network error. Keep publishing for " + str(self._numberOfOfflinePublish) + " messages.") + step2LoopCount = self._numberOfOfflinePublish * 100 + while step2LoopCount != 0: + self._manualNetworkError() + if step2LoopCount % 100 == 0: + currentMessage = str(messageCount) + self._publishMessagePool.append(int(currentMessage)) + try: + self._pyCoreClient.publish(self._topic, currentMessage, 0, False) + print("Thread A: Published a message: " + str(currentMessage)) + except publishError: + print("Publish Error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + messageCount += 1 + step2LoopCount -= 1 + time.sleep(0.01) + # Reconnecting + reconnectTiming = 0 # count per 0.01 seconds + while reconnectTiming <= 1000: + if reconnectTiming % 100 == 0: + print("Thread A: Counting reconnect time: " + str(reconnectTiming / 100) + "seconds.") + reconnectTiming += 1 + time.sleep(0.01) + print("Thread A: Reconnected!") + # Publish another set of 3 messages + print("Thread A: Publish 3 messages again.") + step3PublishCount = 3 + while step3PublishCount != 0: + currentMessage = str(messageCount) + self._publishMessagePool.append(int(currentMessage)) + try: + self._pyCoreClient.publish(self._topic, currentMessage, 0, False) + print("Thread A: Published a message: " + str(currentMessage)) + step3PublishCount -= 1 + messageCount += 1 + except publishError: + print("Publish Error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + time.sleep(1) + # Wrap up: Sleep for extra 5 seconds + time.sleep(5) + + def startThreadAndGo(self): + threadHandler = threading.Thread(target=self._runtime) + threadHandler.start() + return threadHandler + + def getPublishMessagePool(self): + return self._publishMessagePool + + +# Class that implements the subscribing thread: Thread B. +# Basically this thread does nothing but subscribes to TopicOnly and keeps receiving messages. +class threadSub: + def __init__(self, pyCoreClient, srcTopic): + self._keepRunning = True + self._pyCoreClient = pyCoreClient + self._subscribeMessagePool = list() + self._topic = srcTopic + + def _messageCallback(self, client, userdata, message): + print("Thread B: Received a new message from topic: " + str(message.topic)) + print("Thread B: Payload is: " + str(message.payload)) + self._subscribeMessagePool.append(int(message.payload)) + + def _runtime(self): + # Subscribe to self._topic + try: + self._pyCoreClient.subscribe(self._topic, 1, self._messageCallback) + except subscribeTimeoutException: + print("Subscribe timeout!") + except subscribeError: + print("Subscribe error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + time.sleep(2.2) + print("Thread B: Subscribed to " + self._topic) + print("Thread B: Now wait for Thread A.") + # Scanning rate is 100 TPS + while self._keepRunning: + time.sleep(0.01) + + def startThreadAndGo(self): + threadHandler = threading.Thread(target=self._runtime) + threadHandler.start() + return threadHandler + + def stopRunning(self): + self._keepRunning = False + + def getSubscribeMessagePool(self): + return self._subscribeMessagePool + + +# Generate answer for this integration test using queue configuration +def generateAnswer(data, queueingSize, srcMode): + dataInWork = sorted(data) + dataHead = dataInWork[:3] + dataTail = dataInWork[-3:] + dataRet = dataHead + dataInWork = dataInWork[3:] + dataInWork = dataInWork[:-3] + if srcMode == 0: # DROP_OLDEST + dataInWork = dataInWork[(-1 * queueingSize):] + dataRet.extend(dataInWork) + dataRet.extend(dataTail) + return sorted(dataRet) + elif srcMode == 1: # DROP_NEWEST + dataInWork = dataInWork[:queueingSize] + dataRet.extend(dataInWork) + dataRet.extend(dataTail) + return sorted(dataRet) + else: + print("Unsupported drop behavior!") + raise ValueError + + +# Create thread object, load in pyCoreClient and perform the set of integration tests +def performConfigurableOfflinePublishQueueTest(clientPub, clientSub): + print("Test DROP_NEWEST....") + clientPub[0].configure_offline_requests_queue(10, DropBehaviorTypes.DROP_NEWEST) # dropNewest + clientSub[0].configure_offline_requests_queue(10, DropBehaviorTypes.DROP_NEWEST) # dropNewest + # Create Topics + TopicOnly = "TopicOnly/" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + # Create thread object + threadPubObject = threadPub(clientPub[0], 15, TopicOnly) # Configure to publish 15 messages during network outage + threadSubObject = threadSub(clientSub[0], TopicOnly) + threadSubHandler = threadSubObject.startThreadAndGo() + time.sleep(3) + threadPubHandler = threadPubObject.startThreadAndGo() + threadPubHandler.join() + threadSubObject.stopRunning() + threadSubHandler.join() + # Verify result + print("Verify DROP_NEWEST:") + answer = generateAnswer(threadPubObject.getPublishMessagePool(), 10, 1) + print("ANSWER:") + print(answer) + print("ACTUAL:") + print(threadSubObject.getSubscribeMessagePool()) + # We are doing QoS0 publish. We cannot guarantee when the drop will happen since we cannot guarantee a fixed time out + # of disconnect detection. However, once offline requests queue starts involving, it should queue up to its limit, + # thus the total number of received messages after draining should always match. + if len(threadSubObject.getSubscribeMessagePool()) == len(answer): + print("Passed.") + else: + print("Verify DROP_NEWEST failed!!!") + return False + time.sleep(5) + print("Test DROP_OLDEST....") + clientPub[0].configure_offline_requests_queue(10, DropBehaviorTypes.DROP_OLDEST) # dropOldest + clientSub[0].configure_offline_requests_queue(10, DropBehaviorTypes.DROP_OLDEST) # dropOldest + # Create thread object + threadPubObject = threadPub(clientPub[0], 15, TopicOnly) # Configure to publish 15 messages during network outage + threadSubObject = threadSub(clientSub[0], TopicOnly) + threadSubHandler = threadSubObject.startThreadAndGo() + time.sleep(3) + threadPubHandler = threadPubObject.startThreadAndGo() + threadPubHandler.join() + threadSubObject.stopRunning() + threadSubHandler.join() + # Verify result + print("Verify DROP_OLDEST:") + answer = generateAnswer(threadPubObject.getPublishMessagePool(), 10, 0) + print(answer) + print("ACTUAL:") + print(threadSubObject.getSubscribeMessagePool()) + if len(threadSubObject.getSubscribeMessagePool()) == len(answer): + print("Passed.") + else: + print("Verify DROP_OLDEST failed!!!") + return False + return True + + +# Check inputs +myCheckInManager = checkInManager.checkInManager(1) +myCheckInManager.verify(sys.argv) + +host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +rootCA = "./test-integration/Credentials/rootCA.crt" +certificate = "./test-integration/Credentials/certificate.pem.crt" +privateKey = "./test-integration/Credentials/privateKey.pem.key" +mode = myCheckInManager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Init Python core and connect +myMQTTClientManager = MQTTClientManager.MQTTClientManager() +clientPub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_PUB, host, rootCA, + certificate, privateKey, mode=mode) +clientSub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_SUB, host, rootCA, + certificate, privateKey, mode=mode) + +if clientPub is None or clientSub is None: + exit(4) + +print("Two clients are connected!") + +# Functionality test +if not performConfigurableOfflinePublishQueueTest([clientPub], [clientSub]): + print("The above Drop behavior broken!") + exit(4) diff --git a/test-integration/IntegrationTests/IntegrationTestDiscovery.py b/test-integration/IntegrationTests/IntegrationTestDiscovery.py new file mode 100644 index 0000000..8f23aa9 --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestDiscovery.py @@ -0,0 +1,215 @@ +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider +from TestToolLibrary.checkInManager import checkInManager +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsWebSocket + + +HOST = "arc9d2oott9lj-ats.iot.us-east-1.amazonaws.com" # 003261610643 +PORT = 8443 +CA = "./test-integration/Credentials/rootCA.crt" +CERT = "./test-integration/Credentials/certificate_drs.pem.crt" +KEY = "./test-integration/Credentials/privateKey_drs.pem.key" +TIME_OUT_SEC = 30 +# This is a pre-generated test data from DRS integration tests +ID_PREFIX = "Id-" +GGC_ARN = "arn:aws:iot:us-east-1:003261610643:thing/DRS_GGC_0kegiNGA_0" +GGC_PORT_NUMBER_BASE = 8080 +GGC_HOST_ADDRESS_PREFIX = "192.168.101." +METADATA_PREFIX = "Description-" +GROUP_ID = "627bf63d-ae64-4f58-a18c-80a44fcf4088" +THING_NAME = "DRS_GGAD_0kegiNGA_0" +EXPECTED_CA_CONTENT = "-----BEGIN CERTIFICATE-----\n" \ + "MIIEFTCCAv2gAwIBAgIVAPZfc4GMLZPmXbnoaZm6jRDqDs4+MA0GCSqGSIb3DQEB\n" \ + "CwUAMIGoMQswCQYDVQQGEwJVUzEYMBYGA1UECgwPQW1hem9uLmNvbSBJbmMuMRww\n" \ + "GgYDVQQLDBNBbWF6b24gV2ViIFNlcnZpY2VzMRMwEQYDVQQIDApXYXNoaW5ndG9u\n" \ + "MRAwDgYDVQQHDAdTZWF0dGxlMTowOAYDVQQDDDEwMDMyNjE2MTA2NDM6NjI3YmY2\n" \ + "M2QtYWU2NC00ZjU4LWExOGMtODBhNDRmY2Y0MDg4MCAXDTE3MDUyNTE4NDI1OVoY\n" \ + "DzIwOTcwNTI1MTg0MjU4WjCBqDELMAkGA1UEBhMCVVMxGDAWBgNVBAoMD0FtYXpv\n" \ + "bi5jb20gSW5jLjEcMBoGA1UECwwTQW1hem9uIFdlYiBTZXJ2aWNlczETMBEGA1UE\n" \ + "CAwKV2FzaGluZ3RvbjEQMA4GA1UEBwwHU2VhdHRsZTE6MDgGA1UEAwwxMDAzMjYx\n" \ + "NjEwNjQzOjYyN2JmNjNkLWFlNjQtNGY1OC1hMThjLTgwYTQ0ZmNmNDA4ODCCASIw\n" \ + "DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKEWtZtKyJUg2VUwZkbkVtltrfam\n" \ + "s9LMIdKNA3Wz4zSLhZjKHiTSkQmpZwKle5ziYs6Q5hfeT8WC0FNAVv1JhnwsuGfT\n" \ + "sG0UO5dSn7wqXOJigKC1CaSGqeFpKB0/a3wR1L6pCGVbLZ86/sPCEPHHJDieQ+Ps\n" \ + "RnOcUGb4CuIBnI2N+lafWNa4F4KRSVJCEeZ6u4iWVVdIEcDLKlakY45jtVvQqwnz\n" \ + "3leFsN7PTLEkVq5u1PXSbT5DWv6p+5NoDnGAT7j7Wbr2yJw7DtpBOL6oWkAdbFAQ\n" \ + "2097e8mIxNYE9xAzRlb5wEr6jZl/8K60v9P83OapMeuOg4JS8FGulHXbDg0CAwEA\n" \ + "AaMyMDAwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQU21ELaPCH9Oh001OS0JMv\n" \ + "n8hU8dYwDQYJKoZIhvcNAQELBQADggEBABW66eH/+/v9Nq5jtJzflfrqAfBOzWLj\n" \ + "UTEv6szkYzV5Crr8vnu2P5OlyA0NdiKGiAm0AgoDkf+n9HU3Hc0zm3G/QaAO2UmN\n" \ + "9MwtIp29BSRf+gd1bX/WZTtl5I5xl290BDfr5o08I6TOf0A4P8IAkGwku5j0IQjM\n" \ + "ns2HH5UVki155dtmWDEGX6q35KABbsmv3tO1+geJVYnd1QkHzR5IXA12gxlMw9GJ\n" \ + "+cOw+rwJJ2ZcXo3HFoXBcsPqPOa1SO3vTl3XWQ+jX3vyDsxh/VGoJ4epsjwmJ+dW\n" \ + "sHJoqsa3ZPDW0LcEuYgdzYWRhumGwH9fJJUx0GS4Tdg4ud+6jpuyflU=\n" \ + "-----END CERTIFICATE-----\n" +# The expected response from DRS should be: +''' +{ + "GGGroups": [ + { + "GGGroupId": "627bf63d-ae64-4f58-a18c-80a44fcf4088", + "Cores": [ + { + "thingArn": "arn:aws:iot:us-east-1:003261610643:thing\/DRS_GGC_0kegiNGA_0", + "Connectivity": [ + { + "Id": "Id-0", + "HostAddress": "192.168.101.0", + "PortNumber": 8080, + "Metadata": "Description-0" + }, + { + "Id": "Id-1", + "HostAddress": "192.168.101.1", + "PortNumber": 8081, + "Metadata": "Description-1" + }, + { + "Id": "Id-2", + "HostAddress": "192.168.101.2", + "PortNumber": 8082, + "Metadata": "Description-2" + } + ] + } + ], + "CAs": [ + "-----BEGIN CERTIFICATE-----\n + MIIEFTCCAv2gAwIBAgIVAPZfc4GMLZPmXbnoaZm6jRDqDs4+MA0GCSqGSIb3DQEB\n + CwUAMIGoMQswCQYDVQQGEwJVUzEYMBYGA1UECgwPQW1hem9uLmNvbSBJbmMuMRww\n + GgYDVQQLDBNBbWF6b24gV2ViIFNlcnZpY2VzMRMwEQYDVQQIDApXYXNoaW5ndG9u\n + MRAwDgYDVQQHDAdTZWF0dGxlMTowOAYDVQQDDDEwMDMyNjE2MTA2NDM6NjI3YmY2\n + M2QtYWU2NC00ZjU4LWExOGMtODBhNDRmY2Y0MDg4MCAXDTE3MDUyNTE4NDI1OVoY\n + DzIwOTcwNTI1MTg0MjU4WjCBqDELMAkGA1UEBhMCVVMxGDAWBgNVBAoMD0FtYXpv\n + bi5jb20gSW5jLjEcMBoGA1UECwwTQW1hem9uIFdlYiBTZXJ2aWNlczETMBEGA1UE\n + CAwKV2FzaGluZ3RvbjEQMA4GA1UEBwwHU2VhdHRsZTE6MDgGA1UEAwwxMDAzMjYx\n + NjEwNjQzOjYyN2JmNjNkLWFlNjQtNGY1OC1hMThjLTgwYTQ0ZmNmNDA4ODCCASIw\n + DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKEWtZtKyJUg2VUwZkbkVtltrfam\n + s9LMIdKNA3Wz4zSLhZjKHiTSkQmpZwKle5ziYs6Q5hfeT8WC0FNAVv1JhnwsuGfT\n + sG0UO5dSn7wqXOJigKC1CaSGqeFpKB0\/a3wR1L6pCGVbLZ86\/sPCEPHHJDieQ+Ps\n + RnOcUGb4CuIBnI2N+lafWNa4F4KRSVJCEeZ6u4iWVVdIEcDLKlakY45jtVvQqwnz\n + 3leFsN7PTLEkVq5u1PXSbT5DWv6p+5NoDnGAT7j7Wbr2yJw7DtpBOL6oWkAdbFAQ\n + 2097e8mIxNYE9xAzRlb5wEr6jZl\/8K60v9P83OapMeuOg4JS8FGulHXbDg0CAwEA\n + AaMyMDAwDwYDVR0TAQH\/BAUwAwEB\/zAdBgNVHQ4EFgQU21ELaPCH9Oh001OS0JMv\n + n8hU8dYwDQYJKoZIhvcNAQELBQADggEBABW66eH\/+\/v9Nq5jtJzflfrqAfBOzWLj\n + UTEv6szkYzV5Crr8vnu2P5OlyA0NdiKGiAm0AgoDkf+n9HU3Hc0zm3G\/QaAO2UmN\n + 9MwtIp29BSRf+gd1bX\/WZTtl5I5xl290BDfr5o08I6TOf0A4P8IAkGwku5j0IQjM\n + ns2HH5UVki155dtmWDEGX6q35KABbsmv3tO1+geJVYnd1QkHzR5IXA12gxlMw9GJ\n + +cOw+rwJJ2ZcXo3HFoXBcsPqPOa1SO3vTl3XWQ+jX3vyDsxh\/VGoJ4epsjwmJ+dW\n + sHJoqsa3ZPDW0LcEuYgdzYWRhumGwH9fJJUx0GS4Tdg4ud+6jpuyflU=\n + -----END CERTIFICATE-----\n" + ] + } + ] +} +''' + + +def create_discovery_info_provider(): + discovery_info_provider = DiscoveryInfoProvider() + discovery_info_provider.configureEndpoint(HOST, PORT) + discovery_info_provider.configureCredentials(CA, CERT, KEY) + discovery_info_provider.configureTimeout(TIME_OUT_SEC) + return discovery_info_provider + + +def perform_integ_test_discovery(): + discovery_info_provider = create_discovery_info_provider() + return discovery_info_provider.discover(THING_NAME) + + +def _verify_connectivity_info(actual_connectivity_info): + info_id = actual_connectivity_info.id + sequence_number_string = info_id[-1:] + assert actual_connectivity_info.host == GGC_HOST_ADDRESS_PREFIX + sequence_number_string + assert actual_connectivity_info.port == GGC_PORT_NUMBER_BASE + int(sequence_number_string) + assert actual_connectivity_info.metadata == METADATA_PREFIX + sequence_number_string + + +def _verify_connectivity_info_list(actual_connectivity_info_list): + for actual_connectivity_info in actual_connectivity_info_list: + _verify_connectivity_info(actual_connectivity_info) + + +def _verify_ggc_info(actual_ggc_info): + assert actual_ggc_info.coreThingArn == GGC_ARN + assert actual_ggc_info.groupId == GROUP_ID + _verify_connectivity_info_list(actual_ggc_info.connectivityInfoList) + + +def _verify_ca_list(ca_list): + assert len(ca_list) == 1 + try: + group_id, ca = ca_list[0] + assert group_id == GROUP_ID + assert ca == EXPECTED_CA_CONTENT + except: + assert ca_list[0] == EXPECTED_CA_CONTENT + + +def verify_all_cores(discovery_info): + print("Verifying \"getAllCores\"...") + ggc_info_list = discovery_info.getAllCores() + assert len(ggc_info_list) == 1 + _verify_ggc_info(ggc_info_list[0]) + print("Pass!") + + +def verify_all_cas(discovery_info): + print("Verifying \"getAllCas\"...") + ca_list = discovery_info.getAllCas() + _verify_ca_list(ca_list) + print("Pass!") + + +def verify_all_groups(discovery_info): + print("Verifying \"getAllGroups\"...") + group_list = discovery_info.getAllGroups() + assert len(group_list) == 1 + group_info = group_list[0] + _verify_ca_list(group_info.caList) + _verify_ggc_info(group_info.coreConnectivityInfoList[0]) + print("Pass!") + + +def verify_group_object(discovery_info): + print("Verifying \"toObjectAtGroupLevel\"...") + group_info_object = discovery_info.toObjectAtGroupLevel() + _verify_connectivity_info(group_info_object + .get(GROUP_ID) + .getCoreConnectivityInfo(GGC_ARN) + .getConnectivityInfo(ID_PREFIX + "0")) + _verify_connectivity_info(group_info_object + .get(GROUP_ID) + .getCoreConnectivityInfo(GGC_ARN) + .getConnectivityInfo(ID_PREFIX + "1")) + _verify_connectivity_info(group_info_object + .get(GROUP_ID) + .getCoreConnectivityInfo(GGC_ARN) + .getConnectivityInfo(ID_PREFIX + "2")) + print("Pass!") + + +############################################################################ +# Main # +my_check_in_manager = checkInManager(1) +my_check_in_manager.verify(sys.argv) +mode = my_check_in_manager.mode + +skip_when_match(ModeIsWebSocket(mode), "This test is not applicable for mode: %s. Skipping..." % mode) + +# GG Discovery only applies mutual auth with cert +try: + discovery_info = perform_integ_test_discovery() + + verify_all_cores(discovery_info) + verify_all_cas(discovery_info) + verify_all_groups(discovery_info) + verify_group_object(discovery_info) +except BaseException as e: + print("Failed! " + e.message) + exit(4) diff --git a/test-integration/IntegrationTests/IntegrationTestJobsClient.py b/test-integration/IntegrationTests/IntegrationTestJobsClient.py new file mode 100644 index 0000000..18d8aa5 --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestJobsClient.py @@ -0,0 +1,185 @@ +# This integration test verifies the jobs client functionality in the +# Python SDK. +# It performs a number of basic operations without expecting an actual job or +# job execution to be present. The callbacks associated with these actions +# are written to accept and pass server responses given when no jobs or job +# executions exist. +# Finally, the tester pumps through all jobs queued for the given thing +# doing a basic echo of the job document and updating the job execution +# to SUCCEEDED + +import random +import string +import time +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from TestToolLibrary import simpleThreadManager +import TestToolLibrary.checkInManager as checkInManager +import TestToolLibrary.MQTTClientManager as MQTTClientManager +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + +from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTThingJobsClient +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus + +import threading +import datetime +import argparse +import json + +IOT_JOBS_MQTT_RESPONSE_WAIT_SECONDS = 5 +CLIENT_ID = "integrationTestMQTT_Client" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + +class JobsMessageProcessor(object): + def __init__(self, awsIoTMQTTThingJobsClient, clientToken): + #keep track of this to correlate request/responses + self.clientToken = clientToken + self.awsIoTMQTTThingJobsClient = awsIoTMQTTThingJobsClient + + def _setupCallbacks(self): + print('Creating test subscriptions...') + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.getPendingJobAcceptedCallback, jobExecutionTopicType.JOB_GET_PENDING_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE) + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.getPendingJobRejectedCallback, jobExecutionTopicType.JOB_GET_PENDING_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE) + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.describeJobExecAcceptedCallback, jobExecutionTopicType.JOB_DESCRIBE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+') + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.describeJobExecRejectedCallback, jobExecutionTopicType.JOB_DESCRIBE_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE, '+') + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextPendingJobAcceptedCallback, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE) + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextPendingJobRejectedCallback, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE) + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobAcceptedCallback, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+') + assert True == self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobRejectedCallback, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE, '+') + + def getPendingJobAcceptedCallback(self, client, userdata, message): + self.testResult = (True, 'GetPending accepted callback invoked!') + self.waitEvent.set() + + def getPendingJobRejectedCallback(self, client, userdata, message): + self.testResult = (False, 'GetPending rejection callback invoked!') + self.waitEvent.set() + + def describeJobExecAcceptedCallback(self, client, userdata, message): + self.testResult = (True, 'DescribeJobExecution accepted callback invoked!') + self.waitEvent.set() + + def describeJobExecRejectedCallback(self, client, userdata, message): + self.testResult = (False, 'DescribeJobExecution rejected callback invoked!') + self.waitEvent.set() + + def startNextPendingJobAcceptedCallback(self, client, userdata, message): + self.testResult = (True, 'StartNextPendingJob accepted callback invoked!') + payload = json.loads(message.payload.decode('utf-8')) + if 'execution' not in payload: + self.done = True + else: + print('Found job! Document: ' + payload['execution']['jobDocument']) + threading.Thread(target=self.awsIoTMQTTThingJobsClient.sendJobsUpdate(payload['execution']['jobId'], jobExecutionStatus.JOB_EXECUTION_SUCCEEDED)).start() + self.waitEvent.set() + + def startNextPendingJobRejectedCallback(self, client, userdata, message): + self.testResult = (False, 'StartNextPendingJob rejected callback invoked!') + self.waitEvent.set() + + def updateJobAcceptedCallback(self, client, userdata, message): + self.testResult = (True, 'UpdateJob accepted callback invoked!') + self.waitEvent.set() + + def updateJobRejectedCallback(self, client, userdata, message): + #rejection is still a successful test because job IDs may or may not exist, and could exist in unknown state + self.testResult = (True, 'UpdateJob rejected callback invoked!') + self.waitEvent.set() + + def executeJob(self, execution): + print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber'])) + print('With jobDocument: ' + json.dumps(execution['jobDocument'])) + + def runTests(self): + print('Running jobs tests...') + ##create subscriptions + self._setupCallbacks() + + #make publish calls + self._init_test_wait() + self._test_send_response_confirm(self.awsIoTMQTTThingJobsClient.sendJobsDescribe('$next')) + + self._init_test_wait() + self._test_send_response_confirm(self.awsIoTMQTTThingJobsClient.sendJobsUpdate('junkJobId', jobExecutionStatus.JOB_EXECUTION_SUCCEEDED)) + + self._init_test_wait() + self._test_send_response_confirm(self.awsIoTMQTTThingJobsClient.sendJobsQuery(jobExecutionTopicType.JOB_GET_PENDING_TOPIC)) + + self._init_test_wait() + self._test_send_response_confirm(self.awsIoTMQTTThingJobsClient.sendJobsStartNext()) + + self.processAllJobs() + + def processAllJobs(self): + #process all enqueued jobs + print('Processing all jobs found in queue for thing...') + self.done = False + while not self.done: + self._attemptStartNextJob() + time.sleep(5) + + def _attemptStartNextJob(self): + statusDetails = {'StartedBy': 'ClientToken: {} on {}'.format(self.clientToken, datetime.datetime.now().isoformat())} + threading.Thread(target=self.awsIoTMQTTThingJobsClient.sendJobsStartNext, kwargs = {'statusDetails': statusDetails}).start() + + def _init_test_wait(self): + self.testResult = (False, 'Callback not invoked') + self.waitEvent = threading.Event() + + def _test_send_response_confirm(self, sendResult): + if not sendResult: + print('Failed to send jobs message') + exit(4) + else: + #wait 25 seconds for expected callback response to happen + if not self.waitEvent.wait(IOT_JOBS_MQTT_RESPONSE_WAIT_SECONDS): + print('Did not receive expected callback within %d second timeout' % IOT_JOBS_MQTT_RESPONSE_WAIT_SECONDS) + exit(4) + elif not self.testResult[0]: + print('Callback result has failed the test with message: ' + self.testResult[1]) + exit(4) + else: + print('Recieved expected result: ' + self.testResult[1]) + + +############################################################################ +# Main # +# Check inputs +myCheckInManager = checkInManager.checkInManager(1) +myCheckInManager.verify(sys.argv) + +host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +rootCA = "./test-integration/Credentials/rootCA.crt" +certificate = "./test-integration/Credentials/certificate.pem.crt" +privateKey = "./test-integration/Credentials/privateKey.pem.key" +mode = myCheckInManager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Init Python core and connect +myMQTTClientManager = MQTTClientManager.MQTTClientManager() +client = myMQTTClientManager.create_connected_mqtt_client(mode, CLIENT_ID, host, (rootCA, certificate, privateKey)) + +clientId = 'AWSPythonkSDKTestThingClient' +thingName = 'AWSPythonkSDKTestThing' +jobsClient = AWSIoTMQTTThingJobsClient(clientId, thingName, QoS=1, awsIoTMQTTClient=client) + +print('Connecting to MQTT server and setting up callbacks...') +jobsMsgProc = JobsMessageProcessor(jobsClient, clientId) +print('Starting jobs tests...') +jobsMsgProc.runTests() +print('Done running jobs tests') + +#can call this on the jobsClient, or myAWSIoTMQTTClient directly +jobsClient.disconnect() diff --git a/test-integration/IntegrationTests/IntegrationTestMQTTConnection.py b/test-integration/IntegrationTests/IntegrationTestMQTTConnection.py new file mode 100644 index 0000000..252770f --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestMQTTConnection.py @@ -0,0 +1,177 @@ +# This integration test verifies the functionality in the Python core of IoT Yun/Python SDK +# for basic MQTT connection. +# It starts two threads using two different connections to AWS IoT: +# Thread A: publish to "deviceSDK/PyIntegrationTest/Topic", X messages, QoS1, TPS=50 +# Thread B: subscribe to "deviceSDK/PyIntegrationTest/Topic", QoS1 +# Thread B will be started first with extra delay to ensure a valid subscription +# Then thread A will be started. +# Verify send/receive messages are equivalent + + +import random +import string +import time +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from TestToolLibrary import simpleThreadManager +import TestToolLibrary.checkInManager as checkInManager +import TestToolLibrary.MQTTClientManager as MQTTClientManager +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + + +API_TYPE_SYNC = "sync" +API_TYPE_ASYNC = "async" + +CLIENT_ID_PUB = "integrationTestMQTT_ClientPub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) +CLIENT_ID_SUB = "integrationTestMQTT_ClientSub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + + + +# Callback unit for subscribe +class callbackUnit: + def __init__(self, srcSet, apiType): + self._internalSet = srcSet + self._apiType = apiType + + # Callback for clientSub + def messageCallback(self, client, userdata, message): + print(self._apiType + ": Received a new message: " + str(message.payload)) + self._internalSet.add(message.payload.decode('utf-8')) + + def getInternalSet(self): + return self._internalSet + + +# Run function unit +class runFunctionUnit: + def __init__(self, apiType): + self._messagesPublished = set() + self._apiType = apiType + + # Run function for publish thread (one time) + def threadPublish(self, pyCoreClient, numberOfTotalMessages, topic, TPS): + # One time thread + time.sleep(3) # Extra waiting time for valid subscription + messagesLeftToBePublished = numberOfTotalMessages + while messagesLeftToBePublished != 0: + try: + currentMessage = str(messagesLeftToBePublished) + self._performPublish(pyCoreClient, topic, 1, currentMessage) + self._messagesPublished.add(currentMessage) + except publishError: + print("Publish Error for message: " + currentMessage) + except Exception as e: + print("Unknown exception: " + str(type(e)) + " " + str(e.message)) + messagesLeftToBePublished -= 1 + time.sleep(1 / float(TPS)) + print("End of publish thread.") + + def _performPublish(self, pyCoreClient, topic, qos, payload): + if self._apiType == API_TYPE_SYNC: + pyCoreClient.publish(topic, payload, qos, False) + if self._apiType == API_TYPE_ASYNC: + pyCoreClient.publish_async(topic, payload, qos, False, None) # TODO: See if we can also check PUBACKs + + +############################################################################ +# Main # +# Check inputs +myCheckInManager = checkInManager.checkInManager(2) +myCheckInManager.verify(sys.argv) + +host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +rootCA = "./test-integration/Credentials/rootCA.crt" +certificate = "./test-integration/Credentials/certificate.pem.crt" +privateKey = "./test-integration/Credentials/privateKey.pem.key" +mode = myCheckInManager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Init Python core and connect +myMQTTClientManager = MQTTClientManager.MQTTClientManager() +clientPub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_PUB, host, rootCA, + certificate, privateKey, mode=mode) +clientSub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_SUB, host, rootCA, + certificate, privateKey, mode=mode) + +if clientPub is None or clientSub is None: + exit(4) + +print("Two clients are connected!") + +# Configurations +################ +# Data/Data pool +TPS = 20 +numberOfTotalMessagesAsync = myCheckInManager.customParameter +numberOfTotalMessagesSync = numberOfTotalMessagesAsync / 10 +subSetAsync = set() +subSetSync = set() +subCallbackUnitAsync = callbackUnit(subSetAsync, API_TYPE_ASYNC) +subCallbackUnitSync = callbackUnit(subSetSync, API_TYPE_SYNC) +syncTopic = "YunSDK/PyIntegrationTest/Topic/sync" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) +print(syncTopic) +asyncTopic = "YunSDK/PyIntegrationTest/Topic/async" + "".join(random.choice(string.ascii_lowercase) for j in range(4)) +# clientSub +try: + clientSub.subscribe(asyncTopic, 1, subCallbackUnitAsync.messageCallback) + clientSub.subscribe(syncTopic, 1, subCallbackUnitSync.messageCallback) + time.sleep(3) +except subscribeTimeoutException: + print("Subscribe timeout!") +except subscribeError: + print("Subscribe error!") +except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) +# Threads +mySimpleThreadManager = simpleThreadManager.simpleThreadManager() +myRunFunctionUnitSyncPub = runFunctionUnit(API_TYPE_SYNC) +myRunFunctionUnitAsyncPub = runFunctionUnit(API_TYPE_ASYNC) +publishSyncThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnitSyncPub.threadPublish, + [clientPub, numberOfTotalMessagesSync, syncTopic, TPS]) +publishAsyncThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnitAsyncPub.threadPublish, + [clientPub, numberOfTotalMessagesAsync, asyncTopic, TPS]) + +# Performing +############ +mySimpleThreadManager.startThreadWithID(publishSyncThreadID) +mySimpleThreadManager.startThreadWithID(publishAsyncThreadID) +mySimpleThreadManager.joinOneTimeThreadWithID(publishSyncThreadID) +mySimpleThreadManager.joinOneTimeThreadWithID(publishAsyncThreadID) +time.sleep(numberOfTotalMessagesAsync / float(TPS) * 0.5) + +# Verifying +########### +# Length +print("Check if the length of the two sets are equal...") +print("Received from subscription (sync pub): " + str(len(subCallbackUnitSync.getInternalSet()))) +print("Received from subscription (async pub): " + str(len(subCallbackUnitAsync.getInternalSet()))) +print("Sent through sync publishes: " + str(len(myRunFunctionUnitSyncPub._messagesPublished))) +print("Sent through async publishes: " + str(len(myRunFunctionUnitAsyncPub._messagesPublished))) +if len(myRunFunctionUnitSyncPub._messagesPublished) != len(subCallbackUnitSync.getInternalSet()): + print("[Sync pub] Number of messages not equal!") + exit(4) +if len(myRunFunctionUnitAsyncPub._messagesPublished) != len(subCallbackUnitAsync.getInternalSet()): + print("[Asyn pub] Number of messages not equal!") + exit(4) +# Content +print("Check if the content if the two sets are equivalent...") +if myRunFunctionUnitSyncPub._messagesPublished != subCallbackUnitSync.getInternalSet(): + print("[Sync pub] Set content not equal!") + exit(4) +elif myRunFunctionUnitAsyncPub._messagesPublished != subCallbackUnitAsync.getInternalSet(): + print("[Async pub] Set content not equal!") +else: + print("Yes!") diff --git a/test-integration/IntegrationTests/IntegrationTestOfflineQueueingForSubscribeUnsubscribe.py b/test-integration/IntegrationTests/IntegrationTestOfflineQueueingForSubscribeUnsubscribe.py new file mode 100644 index 0000000..c06847d --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestOfflineQueueingForSubscribeUnsubscribe.py @@ -0,0 +1,210 @@ +# This integration test verifies the functionality off queueing up subscribe/unsubscribe requests submitted by the +# client when it is offline, and drain them out when the client is reconnected. The test contains 2 clients running in +# 2 different threads: +# +# In thread A, client_sub_unsub follows the below workflow: +# 1. Client connects to AWS IoT. +# 2. Client subscribes to "topic_A". +# 3. Experience a simulated network error which brings the client offline. +# 4. While offline, client subscribes to "topic_B' and unsubscribes from "topic_A". +# 5. Client reconnects, comes back online and drains out all offline queued requests. +# 6. Client stays and receives messages published in another thread. +# +# In thread B, client_pub follows the below workflow: +# 1. Client in thread B connects to AWS IoT. +# 2. After client in thread A connects and subscribes to "topic_A", client in thread B publishes messages to "topic_A". +# 3. Client in thread B keeps sleeping until client in thread A goes back online and reaches to a stable state (draining done). +# 4. Client in thread B then publishes messages to "topic_A" and "topic_B". +# +# Since client in thread A does a unsubscribe to "topic_A", it should never receive messages published to "topic_A" after +# it reconnects and gets stable. It should have the messages from "topic_A" published at the very beginning. +# Since client in thread A does a subscribe to "topic_B", it should receive messages published to "topic_B" after it +# reconnects and gets stable. + + +import random +import string +import time +from threading import Event +from threading import Thread +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus +from TestToolLibrary.checkInManager import checkInManager +from TestToolLibrary.MQTTClientManager import MQTTClientManager +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + + +def get_random_string(length): + return "".join(random.choice(string.ascii_lowercase) for i in range(length)) + +TOPIC_A = "topic/test/offline_sub_unsub/a" + get_random_string(4) +TOPIC_B = "topic/test/offline_sub_unsub/b" + get_random_string(4) +MESSAGE_PREFIX = "MagicMessage-" +NUMBER_OF_PUBLISHES = 3 +HOST = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +ROOT_CA = "./test-integration/Credentials/rootCA.crt" +CERT = "./test-integration/Credentials/certificate.pem.crt" +KEY = "./test-integration/Credentials/privateKey.pem.key" +CLIENT_PUB_ID = "PySdkIntegTest_OfflineSubUnsub_pub" + get_random_string(4) +CLIENT_SUB_UNSUB_ID = "PySdkIntegTest_OfflineSubUnsub_subunsub" + get_random_string(4) +KEEP_ALIVE_SEC = 1 +EVENT_WAIT_TIME_OUT_SEC = 5 + + +class DualClientRunner(object): + + def __init__(self, mode): + self._publish_end_flag = Event() + self._stable_flag = Event() + self._received_messages_topic_a = list() + self._received_messages_topic_b = list() + self.__mode = mode + self._client_pub = self._create_connected_client(CLIENT_PUB_ID) + print("Created connected client pub.") + self._client_sub_unsub = self._create_connected_client(CLIENT_SUB_UNSUB_ID) + print("Created connected client sub/unsub.") + self._client_sub_unsub.subscribe(TOPIC_A, 1, self._collect_sub_messages) + print("Client sub/unsub subscribed to topic: %s" % TOPIC_A) + time.sleep(2) # Make sure the subscription is valid + + def _create_connected_client(self, id_prefix): + return MQTTClientManager().create_connected_mqtt_client(self.__mode, id_prefix, HOST, (ROOT_CA, CERT, KEY)) + + def start(self): + thread_client_sub_unsub = Thread(target=self._thread_client_sub_unsub_runtime) + thread_client_pub = Thread(target=self._thread_client_pub_runtime) + thread_client_sub_unsub.start() + thread_client_pub.start() + thread_client_pub.join() + thread_client_sub_unsub.join() + + def _thread_client_sub_unsub_runtime(self): + print("Start client sub/unsub runtime thread...") + print("Client sub/unsub waits on the 1st round of publishes to end...") + if not self._publish_end_flag.wait(EVENT_WAIT_TIME_OUT_SEC): + raise RuntimeError("Timed out in waiting for the publishes to topic: %s" % TOPIC_A) + print("Client sub/unsub gets notified.") + self._publish_end_flag.clear() + + print("Client sub/unsub now goes offline...") + self._go_offline_and_send_requests() + + # Wait until the connection is stable and then notify + print("Client sub/unsub waits on a stable connection...") + self._wait_until_stable_connection() + + print("Client sub/unsub waits on the 2nd round of publishes to end...") + if not self._publish_end_flag.wait(EVENT_WAIT_TIME_OUT_SEC): + raise RuntimeError("Timed out in waiting for the publishes to topic: %s" % TOPIC_B) + print("Client sub/unsub gets notified.") + self._publish_end_flag.clear() + + print("Client sub/unsub runtime thread ends.") + + def _wait_until_stable_connection(self): + reconnect_timing = 0 + while self._client_sub_unsub._mqtt_core._client_status.get_status() != ClientStatus.STABLE: + time.sleep(0.01) + reconnect_timing += 1 + if reconnect_timing % 100 == 0: + print("Client sub/unsub: Counting reconnect time: " + str(reconnect_timing / 100) + " seconds.") + print("Client sub/unsub: Counting reconnect time result: " + str(float(reconnect_timing) / 100) + " seconds.") + self._stable_flag.set() + + def _collect_sub_messages(self, client, userdata, message): + if message.topic == TOPIC_A: + print("Client sub/unsub: Got a message from %s" % TOPIC_A) + self._received_messages_topic_a.append(message.payload) + if message.topic == TOPIC_B: + print("Client sub/unsub: Got a message from %s" % TOPIC_B) + self._received_messages_topic_b.append(message.payload) + + def _go_offline_and_send_requests(self): + do_once = True + loop_count = EVENT_WAIT_TIME_OUT_SEC * 100 + while loop_count != 0: + self._manual_network_error() + if loop_count % 100 == 0: + print("Client sub/unsub: Offline time down count: %d sec" % (loop_count / 100)) + if do_once and (loop_count / 100) <= (EVENT_WAIT_TIME_OUT_SEC / 2): + print("Client sub/unsub: Performing offline sub/unsub...") + self._client_sub_unsub.subscribe(TOPIC_B, 1, self._collect_sub_messages) + self._client_sub_unsub.unsubscribe(TOPIC_A) + print("Client sub/unsub: Done with offline sub/unsub.") + do_once = False + loop_count -= 1 + time.sleep(0.01) + + def _manual_network_error(self): + # Ensure we close the socket + if self._client_sub_unsub._mqtt_core._internal_async_client._paho_client._sock: + self._client_sub_unsub._mqtt_core._internal_async_client._paho_client._sock.close() + self._client_sub_unsub._mqtt_core._internal_async_client._paho_client._sock = None + if self._client_sub_unsub._mqtt_core._internal_async_client._paho_client._ssl: + self._client_sub_unsub._mqtt_core._internal_async_client._paho_client._ssl.close() + self._client_sub_unsub._mqtt_core._internal_async_client._paho_client._ssl = None + # Fake that we have detected the disconnection + self._client_sub_unsub._mqtt_core._internal_async_client._paho_client.on_disconnect(None, None, 0) + + def _thread_client_pub_runtime(self): + print("Start client pub runtime thread...") + print("Client pub: 1st round of publishes") + for i in range(NUMBER_OF_PUBLISHES): + self._client_pub.publish(TOPIC_A, MESSAGE_PREFIX + str(i), 1) + print("Client pub: Published a message") + time.sleep(0.5) + time.sleep(1) + print("Client pub: Publishes done. Notifying...") + self._publish_end_flag.set() + + print("Client pub waits on client sub/unsub to be stable...") + time.sleep(1) + if not self._stable_flag.wait(EVENT_WAIT_TIME_OUT_SEC * 3): # We wait longer for the reconnect/stabilization + raise RuntimeError("Timed out in waiting for client_sub_unsub to be stable") + self._stable_flag.clear() + + print("Client pub: 2nd round of publishes") + for j in range(NUMBER_OF_PUBLISHES): + self._client_pub.publish(TOPIC_B, MESSAGE_PREFIX + str(j), 1) + print("Client pub: Published a message to %s" % TOPIC_B) + self._client_pub.publish(TOPIC_A, MESSAGE_PREFIX + str(j) + "-dup", 1) + print("Client pub: Published a message to %s" % TOPIC_A) + time.sleep(0.5) + time.sleep(1) + print("Client pub: Publishes done. Notifying...") + self._publish_end_flag.set() + + print("Client pub runtime thread ends.") + + def verify(self): + print("Verifying...") + assert len(self._received_messages_topic_a) == NUMBER_OF_PUBLISHES # We should only receive the first round + assert len(self._received_messages_topic_b) == NUMBER_OF_PUBLISHES # We should only receive the second round + print("Pass!") + + +############################################################################ +# Main # +# Check inputs +my_check_in_manager = checkInManager(1) +my_check_in_manager.verify(sys.argv) +mode = my_check_in_manager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Performing +############ +dual_client_runner = DualClientRunner(mode) +dual_client_runner.start() + +# Verifying +########### +dual_client_runner.verify() diff --git a/test-integration/IntegrationTests/IntegrationTestProgressiveBackoff.py b/test-integration/IntegrationTests/IntegrationTestProgressiveBackoff.py new file mode 100644 index 0000000..cd7b7ec --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestProgressiveBackoff.py @@ -0,0 +1,289 @@ +# This integration test verifies the functionality in the Python core of Yun/Python SDK +# for progressive backoff logic in auto-reconnect. +# It starts two threads using two different connections to AWS IoT: +# Thread B subscribes to "coolTopic" and waits for incoming messages. Network +# failure will happen occasionally in thread B with a variant interval (connected +# period), simulating stable/unstable connection so as to test the reset logic of +# backoff timing. Once thread B is back online, an internal flag will be set to +# notify the other thread, Thread A, to start publishing to the same topic. +# Thread A will publish a set of messages (a fixed number of messages) to "coolTopic" +# using QoS1 and does nothing in the rest of the time. It will only start publishing +# when it gets ready notification from thread B. No network failure will happen in +# thread A. +# Because thread A is always online and only publishes when thread B is back online, +# all messages published to "coolTopic" should be received by thread B. In meantime, +# thread B should have an increasing amount of backoff waiting period until the +# connected period reaches the length of time for a stable connection. After that, +# the backoff waiting period should be reset. +# The following things will be verified to pass the test: +# 1. All messages are received. +# 2. Backoff waiting period increases as configured before the thread reaches to a +# stable connection. +# 3. Backoff waiting period does not exceed the maximum allowed time. +# 4. Backoff waiting period is reset after the thread reaches to a stable connection. + + +import string +import random +import time +import threading +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from TestToolLibrary import simpleThreadManager +import TestToolLibrary.checkInManager as checkInManager +import TestToolLibrary.MQTTClientManager as MQTTClientManager +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + +CLIENT_ID_PUB = "integrationTestMQTT_ClientPub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) +CLIENT_ID_SUB = "integrationTestMQTT_ClientSub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + +# Class that implements all the related threads in the test in a controllable manner +class threadPool: + def __init__(self, srcTotalNumberOfNetworkFailure, clientPub, clientSub): + self._threadBReadyFlag = 0 # 0-Not connected, 1-Connected+Subscribed, -1-ShouldExit + self._threadBReadyFlagMutex = threading.Lock() + self._targetedTopic = "coolTopic" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + self._publishMessagePool = set() + self._receiveMessagePool = set() + self._roundOfNetworkFailure = 1 + self._totalNumberOfNetworkFailure = srcTotalNumberOfNetworkFailure + self._clientPub = clientPub + self._clientSub = clientSub + self._pubCount = 0 + self._reconnectTimeRecord = list() + self._connectedTimeRecord = list() + + # Message callback for collecting incoming messages from the subscribed topic + def _messageCallback(self, client, userdata, message): + print("Thread B: Received new message: " + str(message.payload)) + self._receiveMessagePool.add(message.payload.decode('utf-8')) + + # The one that publishes + def threadARuntime(self): + exitNow = False + while not exitNow: + self._threadBReadyFlagMutex.acquire() + # Thread A is still reconnecting, WAIT! + if self._threadBReadyFlag == 0: + pass + # Thread A is connected and subscribed, PUBLISH! + elif self._threadBReadyFlag == 1: + self._publish3Messages() + self._threadBReadyFlag = 0 # Reset the readyFlag + # Thread A has finished all rounds of network failure/reconnect, EXIT! + else: + exitNow = True + self._threadBReadyFlagMutex.release() + time.sleep(0.01) # 0.01 sec scanning + + # Publish a set of messages: 3 + def _publish3Messages(self): + loopCount = 3 + while loopCount != 0: + try: + currentMessage = "Message" + str(self._pubCount) + print("Test publish to topic : " + self._targetedTopic) + self._clientPub.publish(self._targetedTopic, currentMessage, 1, False) + print("Thread A: Published new message: " + str(currentMessage)) + self._publishMessagePool.add(currentMessage) + self._pubCount += 1 + loopCount -= 1 + except publishError: + print("Publish error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + time.sleep(0.5) + + # The one that subscribes and has network failures + def threadBRuntime(self): + # Subscribe to the topic + try: + print("Test subscribe to topic : " + self._targetedTopic) + self._clientSub.subscribe(self._targetedTopic, 1, self._messageCallback) + except subscribeTimeoutException: + print("Subscribe timeout!") + except subscribeError: + print("Subscribe error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + print("Thread B: Subscribe request sent. Staring waiting for subscription processing...") + time.sleep(3) + print("Thread B: Done waiting.") + self._threadBReadyFlagMutex.acquire() + self._threadBReadyFlag = 1 + self._threadBReadyFlagMutex.release() + # Start looping with network failure + connectedPeriodSecond = 3 + while self._roundOfNetworkFailure <= self._totalNumberOfNetworkFailure: + self._connectedTimeRecord.append(connectedPeriodSecond) + # Wait for connectedPeriodSecond + print("Thread B: Connected time: " + str(connectedPeriodSecond) + " seconds.") + print("Thread B: Stable time: 60 seconds.") + time.sleep(connectedPeriodSecond) + print("Thread B: Network failure. Round: " + str(self._roundOfNetworkFailure) + ". 0.5 seconds.") + print("Thread B: Backoff time for this round should be: " + str( + self._clientSub._internal_async_client._paho_client._backoffCore._currentBackoffTimeSecond) + " second(s).") + # Set the readyFlag + self._threadBReadyFlagMutex.acquire() + self._threadBReadyFlag = 0 + self._threadBReadyFlagMutex.release() + # Now lose connection for 0.5 seconds, preventing multiple reconnect attempts + loseConnectionLoopCount = 50 + while loseConnectionLoopCount != 0: + self._manualNetworkError() + loseConnectionLoopCount -= 1 + time.sleep(0.01) + # Wait until the connection/subscription is recovered + reconnectTiming = 0 + while self._clientSub._client_status.get_status() != ClientStatus.STABLE: + time.sleep(0.01) + reconnectTiming += 1 + if reconnectTiming % 100 == 0: + print("Thread B: Counting reconnect time: " + str(reconnectTiming / 100) + " seconds.") + print("Thread B: Counting reconnect time result: " + str(float(reconnectTiming) / 100) + " seconds.") + self._reconnectTimeRecord.append(reconnectTiming / 100) + + time.sleep(3) # For valid subscription + + # Update thread B status + self._threadBReadyFlagMutex.acquire() + self._threadBReadyFlag = 1 + self._threadBReadyFlagMutex.release() + + # Update connectedPeriodSecond + connectedPeriodSecond += (2 ** (self._roundOfNetworkFailure - 1)) + # Update roundOfNetworkFailure + self._roundOfNetworkFailure += 1 + + # Notify thread A shouldExit + self._threadBReadyFlagMutex.acquire() + self._threadBReadyFlag = -1 + self._threadBReadyFlagMutex.release() + + # Simulate a network error + def _manualNetworkError(self): + # Only the subscriber needs the network error + if self._clientSub._internal_async_client._paho_client._sock: + self._clientSub._internal_async_client._paho_client._sock.close() + self._clientSub._internal_async_client._paho_client._sock = None + if self._clientSub._internal_async_client._paho_client._ssl: + self._clientSub._internal_async_client._paho_client._ssl.close() + self._clientSub._internal_async_client._paho_client._ssl = None + # Fake that we have detected the disconnection + self._clientSub._internal_async_client._paho_client.on_disconnect(None, None, 0) + + def getReconnectTimeRecord(self): + return self._reconnectTimeRecord + + def getConnectedTimeRecord(self): + return self._connectedTimeRecord + + +# Generate the correct backoff timing to compare the test result with +def generateCorrectAnswer(baseTime, maximumTime, stableTime, connectedTimeRecord): + answer = list() + currentTime = baseTime + nextTime = baseTime + for i in range(0, len(connectedTimeRecord)): + if connectedTimeRecord[i] >= stableTime or i == 0: + currentTime = baseTime + else: + currentTime = min(currentTime * 2, maximumTime) + answer.append(currentTime) + return answer + + +# Verify backoff time +# Corresponding element should have no diff or a bias greater than 1.5 +def verifyBackoffTime(answerList, resultList): + result = True + for i in range(0, len(answerList)): + if abs(answerList[i] - resultList[i]) > 1.5: + result = False + break + return result + + +############################################################################ +# Main # +# Check inputs +myCheckInManager = checkInManager.checkInManager(2) +myCheckInManager.verify(sys.argv) + +#host via describe-endpoint on this OdinMS: com.amazonaws.iot.device.sdk.credentials.testing.websocket +host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +rootCA = "./test-integration/Credentials/rootCA.crt" +certificate = "./test-integration/Credentials/certificate.pem.crt" +privateKey = "./test-integration/Credentials/privateKey.pem.key" +mode = myCheckInManager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Init Python core and connect +myMQTTClientManager = MQTTClientManager.MQTTClientManager() +clientPub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_PUB, host, rootCA, + certificate, privateKey, mode=mode) +clientSub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_SUB, host, rootCA, + certificate, privateKey, mode=mode) + +if clientPub is None or clientSub is None: + exit(4) + +# Extra configuration for clients +clientPub.configure_reconnect_back_off(1, 16, 60) +clientSub.configure_reconnect_back_off(1, 16, 60) + +print("Two clients are connected!") + +# Configurations +################ +# Custom parameters +NumberOfNetworkFailure = myCheckInManager.customParameter +# ThreadPool object +threadPoolObject = threadPool(NumberOfNetworkFailure, clientPub, clientSub) +# Threads +mySimpleThreadManager = simpleThreadManager.simpleThreadManager() +threadAID = mySimpleThreadManager.createOneTimeThread(threadPoolObject.threadARuntime, []) +threadBID = mySimpleThreadManager.createOneTimeThread(threadPoolObject.threadBRuntime, []) + +# Performing +############ +mySimpleThreadManager.startThreadWithID(threadBID) +mySimpleThreadManager.startThreadWithID(threadAID) +mySimpleThreadManager.joinOneTimeThreadWithID(threadBID) +mySimpleThreadManager.joinOneTimeThreadWithID(threadAID) + +# Verifying +########### +print("Verify that all messages are received...") +if threadPoolObject._publishMessagePool == threadPoolObject._receiveMessagePool: + print("Passed. Recv/Pub: " + str(len(threadPoolObject._receiveMessagePool)) + "/" + str( + len(threadPoolObject._publishMessagePool))) +else: + print("Not all messages are received!") + exit(4) +print("Verify reconnect backoff time record...") +print("ConnectedTimeRecord: " + str(threadPoolObject.getConnectedTimeRecord())) +print("ReconnectTimeRecord: " + str(threadPoolObject.getReconnectTimeRecord())) +print("Answer: " + str(generateCorrectAnswer(1, 16, 60, threadPoolObject.getConnectedTimeRecord()))) +if verifyBackoffTime(generateCorrectAnswer(1, 16, 60, threadPoolObject.getConnectedTimeRecord()), + threadPoolObject.getReconnectTimeRecord()): + print("Passed.") +else: + print("Backoff time does not match theoretical value!") + exit(4) diff --git a/test-integration/IntegrationTests/IntegrationTestShadow.py b/test-integration/IntegrationTests/IntegrationTestShadow.py new file mode 100644 index 0000000..9e2c2a5 --- /dev/null +++ b/test-integration/IntegrationTests/IntegrationTestShadow.py @@ -0,0 +1,248 @@ +# This integration test verifies the functionality in the Python core of Yun/Python SDK +# for IoT shadow operations: shadowUpdate and delta. +# 1. The test generates a X-byte-long random sting and breaks it into a random +# number of chunks, with a fixed length variation from 1 byte to 10 bytes. +# 2. Two threads are created to do shadowUpdate and delta on the same device +# shadow. The update thread updates the desired state with an increasing sequence +# number and a chunk. It is terminated when there are no more chunks to be sent. +# 3. The delta thread listens on delta topic and receives the changes in device +# shadow JSON document. It parses out the sequence number and the chunk, then pack +# them into a dictionary with sequence number as the key and the chunk as the value. +# 4. To verify the result of the test, the random string is re-assembled for both +# the update thread and the delta thread to see if they are equal. +# 5. Since shadow operations are all QoS0 (Pub/Sub), it is still a valid case when +# the re-assembled strings are not equal. Then we need to make sure that the number +# of the missing chunks does not exceed 10% of the total number of chunk transmission +# that succeeds. + +import time +import random +import string +import json +import sys +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary") +sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage") + +from TestToolLibrary import simpleThreadManager +import TestToolLibrary.checkInManager as checkInManager +import TestToolLibrary.MQTTClientManager as MQTTClientManager +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.shadow.deviceShadow import deviceShadow +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.shadow.shadowManager import shadowManager +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException +from TestToolLibrary.skip import skip_when_match +from TestToolLibrary.skip import ModeIsALPN +from TestToolLibrary.skip import Python2VersionLowerThan +from TestToolLibrary.skip import Python3VersionLowerThan + + +# Global configuration +TPS = 1 # Update speed, Spectre does not tolerate high TPS shadow operations... +CLIENT_ID_PUB = "integrationTestMQTT_ClientPub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) +CLIENT_ID_SUB = "integrationTestMQTT_ClientSub" + "".join(random.choice(string.ascii_lowercase) for i in range(4)) + + +# Class that manages the generation and chopping of the random string +class GibberishBox: + def __init__(self, length): + self._content = self._generateGibberish(length) + + def getGibberish(self): + return self._content + + # Random string generator: lower/upper case letter + digits + def _generateGibberish(self, length): + s = string.ascii_lowercase + string.digits + string.ascii_uppercase + return ''.join(random.sample(s * length, length)) + + # Spit out the gibberish chunk by chunk (1-10 bytes) + def gibberishSpitter(self): + randomLength = random.randrange(1, 11) + ret = None + if self._content is not None: + ret = self._content[0:randomLength] + self._content = self._content[randomLength:] + return ret + + +# Class that manages the callback function and record of chunks for re-assembling +class callbackContainer: + def __init__(self): + self._internalDictionary = dict() + + def getInternalDictionary(self): + return self._internalDictionary + + def testCallback(self, payload, type, token): + print("Type: " + type) + print(payload) + print("&&&&&&&&&&&&&&&&&&&&") + # This is the shadow delta callback, so the token should be None + if type == "accepted": + JsonDict = json.loads(payload) + try: + sequenceNumber = int(JsonDict['state']['desired']['sequenceNumber']) + gibberishChunk = JsonDict['state']['desired']['gibberishChunk'] + self._internalDictionary[sequenceNumber] = gibberishChunk + except KeyError as e: + print(e.message) + print("No such key!") + else: + JsonDict = json.loads(payload) + try: + sequenceNumber = int(JsonDict['state']['sequenceNumber']) + gibberishChunk = JsonDict['state']['gibberishChunk'] + self._internalDictionary[sequenceNumber] = gibberishChunk + except KeyError as e: + print(e.message) + print("No such key!") + + +# Thread runtime function +def threadShadowUpdate(deviceShadow, callback, TPS, gibberishBox, maxNumMessage): + time.sleep(2) + chunkSequence = 0 + while True: + currentChunk = gibberishBox.gibberishSpitter() + if currentChunk != "": + outboundJsonDict = dict() + outboundJsonDict["state"] = dict() + outboundJsonDict["state"]["desired"] = dict() + outboundJsonDict["state"]["desired"]["sequenceNumber"] = chunkSequence + outboundJsonDict["state"]["desired"]["gibberishChunk"] = currentChunk + outboundJSON = json.dumps(outboundJsonDict) + chunkSequence += 1 + try: + deviceShadow.shadowUpdate(outboundJSON, callback, 5) + except publishError: + print("Publish error!") + except subscribeTimeoutException: + print("Subscribe timeout!") + except subscribeError: + print("Subscribe error!") + except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + time.sleep(1 / TPS) + else: + break + print("Update thread completed.") + + +# Re-assemble gibberish +def reAssembleGibberish(srcDict, maxNumMessage): + ret = "" + for i in range(0, maxNumMessage): + try: + ret += srcDict[i] + except KeyError: + pass + return ret + + +# RandomShadowNameSuffix +def randomString(lengthOfString): + return "".join(random.choice(string.ascii_lowercase) for i in range(lengthOfString)) + + +############################################################################ +# Main # +# Check inputs +myCheckInManager = checkInManager.checkInManager(2) +myCheckInManager.verify(sys.argv) + +host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com" +rootCA = "./test-integration/Credentials/rootCA.crt" +certificate = "./test-integration/Credentials/certificate.pem.crt" +privateKey = "./test-integration/Credentials/privateKey.pem.key" +mode = myCheckInManager.mode + +skip_when_match(ModeIsALPN(mode).And( + Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0))) +), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3])) + +# Init Python core and connect +myMQTTClientManager = MQTTClientManager.MQTTClientManager() +clientPub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_PUB, host, rootCA, + certificate, privateKey, mode=mode) +clientSub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_SUB, host, rootCA, + certificate, privateKey, mode=mode) + +if clientPub is None or clientSub is None: + exit(4) + +print("Two clients are connected!") + +# Configurations +################ +# Data +gibberishLength = myCheckInManager.customParameter +# Init device shadow instance +shadowManager1 = shadowManager(clientPub) +shadowManager2 = shadowManager(clientSub) +shadowName = "GibberChunk" + randomString(5) +deviceShadow1 = deviceShadow(shadowName, True, shadowManager1) +deviceShadow2 = deviceShadow(shadowName, True, shadowManager2) +print("Two device shadow instances are created!") + +# Callbacks +callbackHome_Update = callbackContainer() +callbackHome_Delta = callbackContainer() + +# Listen on delta topic +try: + deviceShadow2.shadowRegisterDeltaCallback(callbackHome_Delta.testCallback) +except subscribeError: + print("Subscribe error!") +except subscribeTimeoutException: + print("Subscribe timeout!") +except Exception as e: + print("Unknown exception!") + print("Type: " + str(type(e))) + print("Message: " + str(e.message)) + +# Init gibberishBox +cipher = GibberishBox(gibberishLength) +gibberish = cipher.getGibberish() +print("Random string: " + gibberish) + +# Threads +mySimpleThreadManager = simpleThreadManager.simpleThreadManager() +updateThreadID = mySimpleThreadManager.createOneTimeThread(threadShadowUpdate, + [deviceShadow1, callbackHome_Update.testCallback, TPS, + cipher, gibberishLength]) + +# Performing +############ +# Functionality test +mySimpleThreadManager.startThreadWithID(updateThreadID) +mySimpleThreadManager.joinOneTimeThreadWithID(updateThreadID) +time.sleep(10) # Just in case + +# Now check the gibberish +gibberishUpdateResult = reAssembleGibberish(callbackHome_Update.getInternalDictionary(), gibberishLength) +gibberishDeltaResult = reAssembleGibberish(callbackHome_Delta.getInternalDictionary(), gibberishLength) +print("Update:") +print(gibberishUpdateResult) +print("Delta:") +print(gibberishDeltaResult) +print("Origin:") +print(gibberish) + +if gibberishUpdateResult != gibberishDeltaResult: + # Since shadow operations are on QoS0 (Pub/Sub), there could be a chance + # where incoming messages are missing on the subscribed side + # A ratio of 95% must be guaranteed to pass this test + dictUpdate = callbackHome_Update.getInternalDictionary() + dictDelta = callbackHome_Delta.getInternalDictionary() + maxBaseNumber = max(len(dictUpdate), len(dictDelta)) + diff = float(abs(len(dictUpdate) - len(dictDelta))) / maxBaseNumber + print("Update/Delta string not equal, missing rate is: " + str(diff * 100) + "%.") + # Largest chunk is 10 bytes, total length is X bytes. + # Minimum number of chunks is X/10 + # Maximum missing rate = 10% + if diff > 0.1: + print("Missing rate too high!") + exit(4) diff --git a/test-integration/IntegrationTests/TestToolLibrary/MQTTClientManager.py b/test-integration/IntegrationTests/TestToolLibrary/MQTTClientManager.py new file mode 100644 index 0000000..05c671a --- /dev/null +++ b/test-integration/IntegrationTests/TestToolLibrary/MQTTClientManager.py @@ -0,0 +1,145 @@ +import random +import string +import traceback +from ssl import SSLError + +import TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.protocol.paho.client as paho +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.util.providers import CertificateCredentialsProvider +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.util.providers import CiphersProvider +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.util.enums import DropBehaviorTypes +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.util.providers import EndpointProvider +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.core.protocol.mqtt_core import MqttCore +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError +from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException + + +CERT_MUTUAL_AUTH = "MutualAuth" +WEBSOCKET = 'Websocket' +CERT_ALPN = "ALPN" + + +# Class that manages the creation, configuration and connection of MQTT Client +class MQTTClientManager: + + def create_connected_mqtt_client(self, mode, client_id, host, credentials_data, callbacks=None): + client = self.create_nonconnected_mqtt_client(mode, client_id, host, credentials_data, callbacks) + return self._connect_client(client) + + def create_nonconnected_mqtt_client(self, mode, client_id, host, credentials_data, callbacks=None): + if mode == CERT_MUTUAL_AUTH: + sdk_mqtt_client = self._create_nonconnected_mqtt_client_with_cert(client_id, host, 8883, credentials_data) + elif mode == WEBSOCKET: + root_ca, certificate, private_key = credentials_data + sdk_mqtt_client = AWSIoTMQTTClient(clientID=client_id + "_" + self._random_string(3), useWebsocket=True) + sdk_mqtt_client.configureEndpoint(host, 443) + sdk_mqtt_client.configureCredentials(CAFilePath=root_ca) + elif mode == CERT_ALPN: + sdk_mqtt_client = self._create_nonconnected_mqtt_client_with_cert(client_id, host, 443, credentials_data) + else: + raise RuntimeError("Test mode: " + str(mode) + " not supported!") + + sdk_mqtt_client.configureConnectDisconnectTimeout(10) + sdk_mqtt_client.configureMQTTOperationTimeout(5) + + if callbacks is not None: + sdk_mqtt_client.onOnline = callbacks.on_online + sdk_mqtt_client.onOffline = callbacks.on_offline + sdk_mqtt_client.onMessage = callbacks.on_message + + return sdk_mqtt_client + + def _create_nonconnected_mqtt_client_with_cert(self, client_id, host, port, credentials_data): + root_ca, certificate, private_key = credentials_data + sdk_mqtt_client = AWSIoTMQTTClient(clientID=client_id + "_" + self._random_string(3)) + sdk_mqtt_client.configureEndpoint(host, port) + sdk_mqtt_client.configureCredentials(CAFilePath=root_ca, KeyPath=private_key, CertificatePath=certificate) + + return sdk_mqtt_client + + def create_connected_mqtt_core(self, client_id, host, root_ca, certificate, private_key, mode): + client = self.create_nonconnected_mqtt_core(client_id, host, root_ca, certificate, private_key, mode) + return self._connect_client(client) + + def create_nonconnected_mqtt_core(self, client_id, host, root_ca, certificate, private_key, mode): + client = None + protocol = None + port = None + is_websocket = False + is_alpn = False + + if mode == CERT_MUTUAL_AUTH: + protocol = paho.MQTTv311 + port = 8883 + elif mode == WEBSOCKET: + protocol = paho.MQTTv31 + port = 443 + is_websocket = True + elif mode == CERT_ALPN: + protocol = paho.MQTTv311 + port = 443 + is_alpn = True + else: + print("Error in creating the client") + + if protocol is None or port is None: + print("Not enough input parameters") + return client # client is None is the necessary params are not there + + try: + client = MqttCore(client_id + "_" + self._random_string(3), True, protocol, is_websocket) + + endpoint_provider = EndpointProvider() + endpoint_provider.set_host(host) + endpoint_provider.set_port(port) + + # Once is_websocket is True, certificate_credentials_provider will NOT be used + # by the client even if it is configured + certificate_credentials_provider = CertificateCredentialsProvider() + certificate_credentials_provider.set_ca_path(root_ca) + certificate_credentials_provider.set_cert_path(certificate) + certificate_credentials_provider.set_key_path(private_key) + + cipher_provider = CiphersProvider() + cipher_provider.set_ciphers(None) + + client.configure_endpoint(endpoint_provider) + client.configure_cert_credentials(certificate_credentials_provider, cipher_provider) + client.configure_connect_disconnect_timeout_sec(10) + client.configure_operation_timeout_sec(5) + client.configure_offline_requests_queue(10, DropBehaviorTypes.DROP_NEWEST) + + if is_alpn: + client.configure_alpn_protocols() + except Exception as e: + print("Unknown exception in creating the client: " + str(e)) + finally: + return client + + def _random_string(self, length): + return "".join(random.choice(string.ascii_lowercase) for i in range(length)) + + def _connect_client(self, client): + if client is None: + return client + + try: + client.connect(1) + except connectTimeoutException as e: + print("Connect timeout: " + str(e)) + return None + except connectError as e: + print("Connect error:" + str(e)) + return None + except SSLError as e: + print("Connect SSL error: " + str(e)) + return None + except IOError as e: + print("Credentials not found: " + str(e)) + return None + except Exception as e: + print("Unknown exception in connect: ") + traceback.print_exc() + return None + + return client diff --git a/test-integration/IntegrationTests/TestToolLibrary/SDKPackage/.gitignore b/test-integration/IntegrationTests/TestToolLibrary/SDKPackage/.gitignore new file mode 100644 index 0000000..151aa74 --- /dev/null +++ b/test-integration/IntegrationTests/TestToolLibrary/SDKPackage/.gitignore @@ -0,0 +1,3 @@ +*.* +!.gitignore +!__init__.py \ No newline at end of file diff --git a/test-integration/IntegrationTests/TestToolLibrary/SDKPackage/__init__.py b/test-integration/IntegrationTests/TestToolLibrary/SDKPackage/__init__.py new file mode 100644 index 0000000..1ad354e --- /dev/null +++ b/test-integration/IntegrationTests/TestToolLibrary/SDKPackage/__init__.py @@ -0,0 +1 @@ +__version__ = "1.4.9" diff --git a/test-integration/IntegrationTests/TestToolLibrary/__init__.py b/test-integration/IntegrationTests/TestToolLibrary/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test-integration/IntegrationTests/TestToolLibrary/checkInManager.py b/test-integration/IntegrationTests/TestToolLibrary/checkInManager.py new file mode 100644 index 0000000..2faaa02 --- /dev/null +++ b/test-integration/IntegrationTests/TestToolLibrary/checkInManager.py @@ -0,0 +1,18 @@ +# Class that is responsible for input/dependency verification +import sys + + +class checkInManager: + + def __init__(self, numberOfInputParameters): + self._numberOfInputParameters = numberOfInputParameters + self.mode = None + self.customParameter = None + + def verify(self, args): + # Check if we got the correct command line params + if len(args) != self._numberOfInputParameters + 1: + exit(4) + self.mode = str(args[1]) + if self._numberOfInputParameters + 1 > 2: + self.customParameter = int(args[2]) diff --git a/test-integration/IntegrationTests/TestToolLibrary/simpleThreadManager.py b/test-integration/IntegrationTests/TestToolLibrary/simpleThreadManager.py new file mode 100644 index 0000000..023f8e7 --- /dev/null +++ b/test-integration/IntegrationTests/TestToolLibrary/simpleThreadManager.py @@ -0,0 +1,110 @@ +# Library for controllable threads. Should be able to: +# 1. Create different threads +# 2. Terminate certain thread +# 3. Join certain thread + + +import time +import threading + + +# Describe the type, property and control flag for a certain thread +class _threadControlUnit: + # Constants for different thread type + _THREAD_TYPE_ONETIME = 0 + _THREAD_TYPE_LOOP = 1 + + def __init__(self, threadID, threadType, runFunction, runParameters, scanningSpeedSecond=0.01): + if threadID is None or threadType is None or runFunction is None or runParameters is None: + raise ValueError("None input detected.") + if threadType != self._THREAD_TYPE_ONETIME and threadType != self._THREAD_TYPE_LOOP: + raise ValueError("Thread type not supported.") + self.threadID = threadID + self.threadType = threadType + self.runFunction = runFunction + self.runParameters = runParameters + self.threadObject = None # Holds the real thread object + # Now configure control flag, only meaning for loop thread type + if self.threadType == self._THREAD_TYPE_LOOP: + self.stopSign = False # Enabled infinite loop by default + self.scanningSpeedSecond = scanningSpeedSecond + else: + self.stopSign = None # No control flag for one time thread + self.scanningSpeedSecond = -1 + + def _oneTimeRunFunction(self): + self.runFunction(*self.runParameters) + + def _loopRunFunction(self): + while not self.stopSign: + self.runFunction(*self.runParameters) # There should be no manual delay in this function + time.sleep(self.scanningSpeedSecond) + + def _stopMe(self): + self.stopSign = True + + def _setThreadObject(self, threadObject): + self.threadObject = threadObject + + def _getThreadObject(self): + return self.threadObject + + +# Class that manages all threadControlUnit +# Used in a single thread +class simpleThreadManager: + def __init__(self): + self._internalCount = 0 + self._controlCenter = dict() + + def createOneTimeThread(self, runFunction, runParameters): + returnID = self._internalCount + self._controlCenter[self._internalCount] = _threadControlUnit(self._internalCount, + _threadControlUnit._THREAD_TYPE_ONETIME, + runFunction, runParameters) + self._internalCount += 1 + return returnID + + def createLoopThread(self, runFunction, runParameters, scanningSpeedSecond): + returnID = self._internalCount + self._controlCenter[self._internalCount] = _threadControlUnit(self._internalCount, + _threadControlUnit._THREAD_TYPE_LOOP, runFunction, + runParameters, scanningSpeedSecond) + self._internalCount += 1 + return returnID + + def stopLoopThreadWithID(self, threadID): + threadToStop = self._controlCenter.get(threadID) + if threadToStop is None: + raise ValueError("No such threadID.") + else: + if threadToStop.threadType == _threadControlUnit._THREAD_TYPE_LOOP: + threadToStop._stopMe() + time.sleep(3 * threadToStop.scanningSpeedSecond) + else: + raise TypeError("Error! Try to stop a one time thread.") + + def startThreadWithID(self, threadID): + threadToStart = self._controlCenter.get(threadID) + if threadToStart is None: + raise ValueError("No such threadID.") + else: + currentThreadType = threadToStart.threadType + newThreadObject = None + if currentThreadType == _threadControlUnit._THREAD_TYPE_LOOP: + newThreadObject = threading.Thread(target=threadToStart._loopRunFunction) + else: # One time thread + newThreadObject = threading.Thread(target=threadToStart._oneTimeRunFunction) + newThreadObject.start() + threadToStart._setThreadObject(newThreadObject) + + def joinOneTimeThreadWithID(self, threadID): + threadToJoin = self._controlCenter.get(threadID) + if threadToJoin is None: + raise ValueError("No such threadID.") + else: + if threadToJoin.threadType == _threadControlUnit._THREAD_TYPE_ONETIME: + currentThreadObject = threadToJoin._getThreadObject() + currentThreadObject.join() + else: + raise TypeError("Error! Try to join a loop thread.") diff --git a/test-integration/IntegrationTests/TestToolLibrary/skip.py b/test-integration/IntegrationTests/TestToolLibrary/skip.py new file mode 100644 index 0000000..4d6e5ca --- /dev/null +++ b/test-integration/IntegrationTests/TestToolLibrary/skip.py @@ -0,0 +1,110 @@ +import sys +from TestToolLibrary.MQTTClientManager import CERT_ALPN +from TestToolLibrary.MQTTClientManager import WEBSOCKET + +# This module manages the skip policy validation for each test + + +def skip_when_match(policy, message): + if policy.validate(): + print(message) + exit(0) # Exit the Python interpreter + + +class Policy(object): + + AND = "and" + OR = "or" + + def __init__(self): + self._relations = [] + + # Use caps to avoid collision with Python built-in and/or keywords + def And(self, policy): + self._relations.append((self.AND, policy)) + return self + + def Or(self, policy): + self._relations.append((self.OR, policy)) + return self + + def validate(self): + result = self.validate_impl() + + for element in self._relations: + operand, policy = element + if operand == self.AND: + result = result and policy.validate() + elif operand == self.OR: + result = result or policy.validate() + else: + raise RuntimeError("Unrecognized operand: " + str(operand)) + + return result + + def validate_impl(self): + raise RuntimeError("Not implemented") + + +class PythonVersion(Policy): + + HIGHER = "higher" + LOWER = "lower" + EQUALS = "equals" + + def __init__(self, actual_version, expected_version, operand): + Policy.__init__(self) + self._actual_version = actual_version + self._expected_version = expected_version + self._operand = operand + + def validate_impl(self): + if self._operand == self.LOWER: + return self._actual_version < self._expected_version + elif self._operand == self.HIGHER: + return self._actual_version > self._expected_version + elif self._operand == self.EQUALS: + return self._actual_version == self._expected_version + else: + raise RuntimeError("Unsupported operand: " + self._operand) + + +class Python2VersionLowerThan(PythonVersion): + + def __init__(self, version): + PythonVersion.__init__(self, sys.version_info[:3], version, PythonVersion.LOWER) + + def validate_impl(self): + return sys.version_info[0] == 2 and PythonVersion.validate_impl(self) + + +class Python3VersionLowerThan(PythonVersion): + + def __init__(self, version): + PythonVersion.__init__(self, sys.version_info[:3], version, PythonVersion.LOWER) + + def validate_impl(self): + return sys.version_info[0] == 3 and PythonVersion.validate_impl(self) + + +class ModeIs(Policy): + + def __init__(self, actual_mode, expected_mode): + Policy.__init__(self) + self._actual_mode = actual_mode + self._expected_mode = expected_mode + + def validate_impl(self): + return self._actual_mode == self._expected_mode + + +class ModeIsALPN(ModeIs): + + def __init__(self, actual_mode): + ModeIs.__init__(self, actual_mode=actual_mode, expected_mode=CERT_ALPN) + + +class ModeIsWebSocket(ModeIs): + + def __init__(self, actual_mode): + ModeIs.__init__(self, actual_mode=actual_mode, expected_mode=WEBSOCKET) diff --git a/test-integration/Tools/retrieve-key.py b/test-integration/Tools/retrieve-key.py new file mode 100644 index 0000000..3884d7f --- /dev/null +++ b/test-integration/Tools/retrieve-key.py @@ -0,0 +1,59 @@ + +import boto3 +import base64 +import sys +from botocore.exceptions import ClientError + +def main(): + secret_name = sys.argv[1] + region_name = "us-east-1" + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client( + service_name='secretsmanager', + region_name=region_name + ) + # In this sample we only handle the specific exceptions for the 'GetSecretValue' API. + # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html + # We rethrow the exception by default. + + try: + get_secret_value_response = client.get_secret_value( + SecretId=secret_name + ) + + except ClientError as e: + if e.response['Error']['Code'] == 'DecryptionFailureException': + # Secrets Manager can't decrypt the protected secret text using the provided KMS key. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'InternalServiceErrorException': + # An error occurred on the server side. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'InvalidParameterException': + # You provided an invalid value for a parameter. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'InvalidRequestException': + # You provided a parameter value that is not valid for the current state of the resource. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'ResourceNotFoundException': + # We can't find the resource that you asked for. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + print(e) + else: + # Decrypts secret using the associated KMS key. + # Depending on whether the secret is a string or binary, one of these fields will be populated. + if 'SecretString' in get_secret_value_response: + secret = get_secret_value_response['SecretString'] + else: + secret = base64.b64decode(get_secret_value_response['SecretBinary']) + print(secret) + + +if __name__ == '__main__': + sys.exit(main()) # next section explains the use of sys.exit diff --git a/test-integration/run/run.sh b/test-integration/run/run.sh new file mode 100755 index 0000000..f420f73 --- /dev/null +++ b/test-integration/run/run.sh @@ -0,0 +1,173 @@ +#!/bin/bash +# +# This script manages the start of integration +# tests for Python core in AWS IoT Arduino Yun +# SDK. The tests should be able to run both in +# Brazil and ToD Worker environment. +# The script will perform the following tasks: +# 1. Retrieve credentials as needed from AWS +# 2. Obtain ZIP package and unzip it locally +# 3. Start the integration tests and check results +# 4. Report any status returned. +# To start the tests as TodWorker: +# > run.sh MutualAuth 1000 100 7 +# or +# > run.sh Websocket 1000 100 7 +# or +# > run.sh ALPN 1000 100 7 +# +# To start the tests from desktop: +# > run.sh MutualAuthT 1000 100 7 +# or +# > run.sh WebsocketT 1000 100 7 +# or +# > run.sh ALPNT 1000 100 7 +# +# 1000 MQTT messages, 100 bytes of random string +# in length and 7 rounds of network failure for +# progressive backoff. +# Test mode (MutualAuth/Websocket) must be +# specified. +# Scale number must also be specified (see usage) + +# Define const +USAGE="usage: run.sh " + +AWSMutualAuth_TodWorker_private_key="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestPrivateKey-vNUQU8" +AWSMutualAuth_TodWorker_certificate="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestCertificate-vTRwjE" +AWSMutualAuth_Desktop_private_key="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestDesktopPrivateKey-DdC7nv" +AWSMutualAuth_Desktop_certificate="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestDesktopCertificate-IA4xbj" + +AWSGGDiscovery_TodWorker_private_key="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestGGDiscoveryPrivateKey-YHQI1F" +AWSGGDiscovery_TodWorker_certificate="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestGGDiscoveryCertificate-TwlAcS" + +AWSSecretForWebsocket_TodWorker_KeyId="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestWebsocketAccessKeyId-1YdB9z" +AWSSecretForWebsocket_TodWorker_SecretKey="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestWebsocketSecretAccessKey-MKTSaV" +AWSSecretForWebsocket_Desktop_KeyId="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestWebsocketAccessKeyId-1YdB9z" +AWSSecretForWebsocket_Desktop_SecretKey="arn:aws:secretsmanager:us-east-1:123124136734:secret:V1IotSdkIntegrationTestWebsocketSecretAccessKey-MKTSaV" + +SDKLocation="./AWSIoTPythonSDK" +RetrieveAWSKeys="./test-integration/Tools/retrieve-key.py" +CREDENTIAL_DIR="./test-integration/Credentials/" +TEST_DIR="./test-integration/IntegrationTests/" +CA_CERT_URL="https://www.amazontrust.com/repository/AmazonRootCA1.pem" +CA_CERT_PATH=${CREDENTIAL_DIR}rootCA.crt + + + + +# If input args not correct, echo usage +if [ $# -ne 4 ]; then + echo ${USAGE} +else +# Description + echo "[STEP] Start run.sh" + echo "***************************************************" + echo "About to start integration tests for IoTPySDK..." + echo "Test Mode: $1" +# Determine the Python versions need to test for this SDK + pythonExecutableArray=() + pythonExecutableArray[0]="3" +# Retrieve credentials as needed from AWS + TestMode="" + echo "[STEP] Retrieve credentials from AWS" + echo "***************************************************" + if [ "$1"x == "MutualAuth"x -o "$1"x == "MutualAuthT"x ]; then + AWSSetName_privatekey=${AWSMutualAuth_TodWorker_private_key} + AWSSetName_certificate=${AWSMutualAuth_TodWorker_certificate} + AWSDRSName_privatekey=${AWSGGDiscovery_TodWorker_private_key} + AWSDRSName_certificate=${AWSGGDiscovery_TodWorker_certificate} + TestMode="MutualAuth" + if [ "$1"x == "MutualAuthT"x ]; then + AWSSetName_privatekey=${AWSMutualAuth_Desktop_private_key} + AWSSetName_certificate=${AWSMutualAuth_Desktop_certificate} + fi + python ${RetrieveAWSKeys} ${AWSSetName_certificate} > ${CREDENTIAL_DIR}certificate.pem.crt + python ${RetrieveAWSKeys} ${AWSSetName_privatekey} > ${CREDENTIAL_DIR}privateKey.pem.key + curl -s "${CA_CERT_URL}" > ${CA_CERT_PATH} + echo -e "URL retrieved certificate data:\n$(cat ${CA_CERT_PATH})\n" + python ${RetrieveAWSKeys} ${AWSDRSName_certificate} > ${CREDENTIAL_DIR}certificate_drs.pem.crt + python ${RetrieveAWSKeys} ${AWSDRSName_privatekey} > ${CREDENTIAL_DIR}privateKey_drs.pem.key + elif [ "$1"x == "Websocket"x -o "$1"x == "WebsocketT"x ]; then + ACCESS_KEY_ID_ARN=$(python ${RetrieveAWSKeys} ${AWSSecretForWebsocket_TodWorker_KeyId}) + ACCESS_SECRET_KEY_ARN=$(python ${RetrieveAWSKeys} ${AWSSecretForWebsocket_TodWorker_SecretKey}) + TestMode="Websocket" + if [ "$1"x == "WebsocketT"x ]; then + ACCESS_KEY_ID_ARN=$(python ${RetrieveAWSKeys} ${AWSSecretForWebsocket_Desktop_KeyId}) + ACCESS_SECRET_KEY_ARN=$(python ${RetrieveAWSKeys} ${AWSSecretForWebsocket_Desktop_SecretKey}) + fi + echo ${ACCESS_KEY_ID_ARN} + echo ${ACCESS_SECRET_KEY_ARN} + export AWS_ACCESS_KEY_ID=${ACCESS_KEY_ID_ARN} + export AWS_SECRET_ACCESS_KEY=${ACCESS_SECRET_KEY_ARN} + curl -s "${CA_CERT_URL}" > ${CA_CERT_PATH} + echo -e "URL retrieved certificate data:\n$(cat ${CA_CERT_PATH})\n" + elif [ "$1"x == "ALPN"x -o "$1"x == "ALPNT"x ]; then + AWSSetName_privatekey=${AWSMutualAuth_TodWorker_private_key} + AWSSetName_certificate=${AWSMutualAuth_TodWorker_certificate} + AWSDRSName_privatekey=${AWSGGDiscovery_TodWorker_private_key} + AWSDRSName_certificate=${AWSGGDiscovery_TodWorker_certificate} + TestMode="ALPN" + if [ "$1"x == "ALPNT"x ]; then + AWSSetName_privatekey=${AWSMutualAuth_Desktop_private_key} + AWSSetName_certificate=${AWSMutualAuth_Desktop_certificate} + fi + python ${RetrieveAWSKeys} ${AWSSetName_certificate} > ${CREDENTIAL_DIR}certificate.pem.crt + python ${RetrieveAWSKeys} ${AWSSetName_privatekey} > ${CREDENTIAL_DIR}privateKey.pem.key + curl -s "${CA_CERT_URL}" > ${CA_CERT_PATH} + echo -e "URL retrieved certificate data:\n$(cat ${CA_CERT_PATH})\n" + python ${RetrieveAWSKeys} ${AWSDRSName_certificate} > ${CREDENTIAL_DIR}certificate_drs.pem.crt + python ${RetrieveAWSKeys} ${AWSDRSName_privatekey} > ${CREDENTIAL_DIR}privateKey_drs.pem.key + else + echo "Mode not supported" + exit 1 + fi +# Obtain ZIP package and unzip it locally + echo ${TestMode} + echo "[STEP] Obtain ZIP package" + echo "***************************************************" + cp -R ${SDKLocation} ./test-integration/IntegrationTests/TestToolLibrary/SDKPackage/ +# Obtain Python executable + + echo "***************************************************" + for file in `ls ${TEST_DIR}` + do + # if [ ${file}x == "IntegrationTestMQTTConnection.py"x ]; then + if [ ${file##*.}x == "py"x ]; then + echo "[SUB] Running test: ${file}..." + + Scale=10 + case "$file" in + "IntegrationTestMQTTConnection.py") Scale=$2 + ;; + "IntegrationTestShadow.py") Scale=$3 + ;; + "IntegrationTestAutoReconnectResubscribe.py") Scale="" + ;; + "IntegrationTestProgressiveBackoff.py") Scale=$4 + ;; + "IntegrationTestConfigurablePublishMessageQueueing.py") Scale="" + ;; + "IntegrationTestDiscovery.py") Scale="" + ;; + "IntegrationTestAsyncAPIGeneralNotificationCallbacks.py") Scale="" + ;; + "IntegrationTestOfflineQueueingForSubscribeUnsubscribe.py") Scale="" + ;; + "IntegrationTestClientReusability.py") Scale="" + ;; + "IntegrationTestJobsClient.py") Scale="" + esac + + python ${TEST_DIR}${file} ${TestMode} ${Scale} + currentTestStatus=$? + echo "[SUB] Test: ${file} completed. Exiting with status: ${currentTestStatus}" + if [ ${currentTestStatus} -ne 0 ]; then + echo "!!!!!!!!!!!!!Test: ${file} failed.!!!!!!!!!!!!!" + exit ${currentTestStatus} + fi + echo "" + fi + done + echo "All integration tests passed" +fi