Skip to content

Commit

Permalink
Merge pull request #310 from aws/integration_test
Browse files Browse the repository at this point in the history
Added integration tests in GitHub CI
  • Loading branch information
xiazhvera authored Apr 11, 2022
2 parents 99fe2e9 + b970c2d commit b5ba6fe
Show file tree
Hide file tree
Showing 21 changed files with 2,759 additions and 0 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,22 @@ jobs:
pip install mock
python3 -m pytest test
integration-tests:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
test-type: [ MutualAuth, MutualAuthT , Websocket, ALPN, ALPNT]
python-version: [ '2.x', '3.x' ]
#[MutualAuth, Websocket, ALPN]
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Integration tests
run: |
pip install pytest
pip install mock
pip install boto3
./test-integration/run/run.sh ${{ matrix.test-type }} 1000 100 7
3 changes: 3 additions & 0 deletions test-integration/Credentials/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
*/
!.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# This integration test verifies the functionality of asynchronous API for plain MQTT operations, as well as general
# notification callbacks. There are 2 phases for this test:
# a) Testing async APIs + onMessage general notification callback
# b) Testing onOnline, onOffline notification callbacks
# To achieve test goal a) and b), the client will follow the routine described below:
# 1. Client does async connect to AWS IoT and captures the CONNACK event and onOnline callback event in the record
# 2. Client does async subscribe to a topic and captures the SUBACK event in the record
# 3. Client does several async publish (QoS1) to the same topic and captures the PUBACK event in the record
# 4. Since client subscribes and publishes to the same topic, onMessage callback should be triggered. We capture these
# events as well in the record.
# 5. Client does async disconnect. This would trigger the offline callback and disconnect event callback. We capture
# them in the record.
# We should be able to receive all ACKs for all operations and corresponding general notification callback triggering
# events.


import random
import string
import time
import sys
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary")
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage")

from TestToolLibrary.checkInManager import checkInManager
from TestToolLibrary.MQTTClientManager import MQTTClientManager
from TestToolLibrary.skip import skip_when_match
from TestToolLibrary.skip import ModeIsALPN
from TestToolLibrary.skip import Python2VersionLowerThan
from TestToolLibrary.skip import Python3VersionLowerThan


TOPIC = "topic/test/async_cb/"
MESSAGE_PREFIX = "MagicMessage-"
NUMBER_OF_PUBLISHES = 3
HOST = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com"
ROOT_CA = "./test-integration/Credentials/rootCA.crt"
CERT = "./test-integration/Credentials/certificate.pem.crt"
KEY = "./test-integration/Credentials/privateKey.pem.key"
CLIENT_ID = "PySdkIntegTest_AsyncAPI_Callbacks"

KEY_ON_ONLINE = "OnOnline"
KEY_ON_OFFLINE = "OnOffline"
KEY_ON_MESSAGE = "OnMessage"
KEY_CONNACK = "Connack"
KEY_DISCONNECT = "Disconnect"
KEY_PUBACK = "Puback"
KEY_SUBACK = "Suback"
KEY_UNSUBACK = "Unsuback"


class CallbackManager(object):

def __init__(self):
self.callback_invocation_record = {
KEY_ON_ONLINE : 0,
KEY_ON_OFFLINE : 0,
KEY_ON_MESSAGE : 0,
KEY_CONNACK : 0,
KEY_DISCONNECT : 0,
KEY_PUBACK : 0,
KEY_SUBACK : 0,
KEY_UNSUBACK : 0
}

def on_online(self):
print("OMG, I am online!")
self.callback_invocation_record[KEY_ON_ONLINE] += 1

def on_offline(self):
print("OMG, I am offline!")
self.callback_invocation_record[KEY_ON_OFFLINE] += 1

def on_message(self, message):
print("OMG, I got a message!")
self.callback_invocation_record[KEY_ON_MESSAGE] += 1

def connack(self, mid, data):
print("OMG, I got a connack!")
self.callback_invocation_record[KEY_CONNACK] += 1

def disconnect(self, mid, data):
print("OMG, I got a disconnect!")
self.callback_invocation_record[KEY_DISCONNECT] += 1

def puback(self, mid):
print("OMG, I got a puback!")
self.callback_invocation_record[KEY_PUBACK] += 1

def suback(self, mid, data):
print("OMG, I got a suback!")
self.callback_invocation_record[KEY_SUBACK] += 1

def unsuback(self, mid):
print("OMG, I got an unsuback!")
self.callback_invocation_record[KEY_UNSUBACK] += 1


def get_random_string(length):
return "".join(random.choice(string.ascii_lowercase) for i in range(length))


############################################################################
# Main #
# Check inputs
my_check_in_manager = checkInManager(1)
my_check_in_manager.verify(sys.argv)
mode = my_check_in_manager.mode

skip_when_match(ModeIsALPN(mode).And(
Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0)))
), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3]))

# Performing
############
print("Connecting...")
callback_manager = CallbackManager()
sdk_mqtt_client = MQTTClientManager()\
.create_nonconnected_mqtt_client(mode, CLIENT_ID, HOST, (ROOT_CA, CERT, KEY), callback_manager)
sdk_mqtt_client.connectAsync(keepAliveIntervalSecond=1, ackCallback=callback_manager.connack) # Add callback
print("Wait some time to make sure we are connected...")
time.sleep(10) # 10 sec

topic = TOPIC + get_random_string(4)
print("Subscribing to topic: " + topic)
sdk_mqtt_client.subscribeAsync(topic, 1, ackCallback=callback_manager.suback, messageCallback=None)
print("Wait some time to make sure we are subscribed...")
time.sleep(3) # 3 sec

print("Publishing...")
for i in range(NUMBER_OF_PUBLISHES):
sdk_mqtt_client.publishAsync(topic, MESSAGE_PREFIX + str(i), 1, ackCallback=callback_manager.puback)
time.sleep(1)
print("Wait sometime to make sure we finished with publishing...")
time.sleep(2)

print("Unsubscribing...")
sdk_mqtt_client.unsubscribeAsync(topic, ackCallback=callback_manager.unsuback)
print("Wait sometime to make sure we finished with unsubscribing...")
time.sleep(2)

print("Disconnecting...")
sdk_mqtt_client.disconnectAsync(ackCallback=callback_manager.disconnect)

print("Wait sometime to let the test result sync...")
time.sleep(3)

print("Verifying...")
try:
assert callback_manager.callback_invocation_record[KEY_ON_ONLINE] == 1
assert callback_manager.callback_invocation_record[KEY_CONNACK] == 1
assert callback_manager.callback_invocation_record[KEY_SUBACK] == 1
assert callback_manager.callback_invocation_record[KEY_PUBACK] == NUMBER_OF_PUBLISHES
assert callback_manager.callback_invocation_record[KEY_ON_MESSAGE] == NUMBER_OF_PUBLISHES
assert callback_manager.callback_invocation_record[KEY_UNSUBACK] == 1
assert callback_manager.callback_invocation_record[KEY_DISCONNECT] == 1
assert callback_manager.callback_invocation_record[KEY_ON_OFFLINE] == 1
except BaseException as e:
print("Failed! %s" % e.message)
print("Pass!")
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# This integration test verifies the functionality in the Python core of Yun/Python SDK
# for auto-reconnect and auto-resubscribe.
# It starts two threads using two different connections to AWS IoT:
# Thread A publishes 10 messages to topicB first, then quiet for a while, and finally
# publishes another 10 messages to topicB.
# Thread B subscribes to topicB and waits to receive messages. Once it receives the first
# 10 messages. It simulates a network error, disconnecting from the broker. In a short time,
# it should automatically reconnect and resubscribe to the previous topic and be able to
# receive the next 10 messages from thread A.
# Because of auto-reconnect/resubscribe, thread B should be able to receive all of the
# messages from topicB published by thread A without calling subscribe again in user code
# explicitly.


import random
import string
import sys
import time
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary")
sys.path.insert(0, "./test-integration/IntegrationTests/TestToolLibrary/SDKPackage")

import TestToolLibrary.checkInManager as checkInManager
import TestToolLibrary.MQTTClientManager as MQTTClientManager
from TestToolLibrary import simpleThreadManager
from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import publishError
from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError
from TestToolLibrary.SDKPackage.AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException
from TestToolLibrary.skip import skip_when_match
from TestToolLibrary.skip import ModeIsALPN
from TestToolLibrary.skip import Python2VersionLowerThan
from TestToolLibrary.skip import Python3VersionLowerThan

CLIENT_ID_PUB = "integrationTestMQTT_ClientPub" + "".join(random.choice(string.ascii_lowercase) for i in range(4))
CLIENT_ID_SUB = "integrationTestMQTT_ClientSub" + "".join(random.choice(string.ascii_lowercase) for i in range(4))

# Callback unit
class callbackUnit:
def __init__(self):
self._internalSet = set()

# Callback fro clientSub
def messageCallback(self, client, userdata, message):
print("Received a new message: " + str(message.payload))
self._internalSet.add(message.payload.decode('utf-8'))

def getInternalSet(self):
return self._internalSet


# Simulate a network error
def manualNetworkError(srcPyMQTTCore):
# Ensure we close the socket
if srcPyMQTTCore._internal_async_client._paho_client._sock:
srcPyMQTTCore._internal_async_client._paho_client._sock.close()
srcPyMQTTCore._internal_async_client._paho_client._sock = None
if srcPyMQTTCore._internal_async_client._paho_client._ssl:
srcPyMQTTCore._internal_async_client._paho_client._ssl.close()
srcPyMQTTCore._internal_async_client._paho_client._ssl = None
# Fake that we have detected the disconnection
srcPyMQTTCore._internal_async_client._paho_client.on_disconnect(None, None, 0)


# runFunctionUnit
class runFunctionUnit():
def __init__(self):
self._messagesPublished = set()
self._topicB = "topicB/" + "".join(random.choice(string.ascii_lowercase) for i in range(4))

# ThreadA runtime function:
# 1. Publish 10 messages to topicB.
# 2. Take a nap: 20 sec
# 3. Publish another 10 messages to topicB.
def threadARuntime(self, pyCoreClient):
time.sleep(3) # Ensure a valid subscription
messageCount = 0
# First 10 messages
while messageCount < 10:
try:
pyCoreClient.publish(self._topicB, str(messageCount), 1, False)
self._messagesPublished.add(str(messageCount))
except publishError:
print("Publish error!")
except Exception as e:
print("Unknown exception!")
print("Type: " + str(type(e)))
print("Message: " + str(e.message))
messageCount += 1
time.sleep(0.5) # TPS = 2
# Take a nap
time.sleep(20)
# Second 10 messages
while messageCount < 20:
try:
pyCoreClient.publish(self._topicB, str(messageCount), 1, False)
self._messagesPublished.add(str(messageCount))
except publishError:
print("Publish Error!")
except Exception as e:
print("Unknown exception!")
print("Type: " + str(type(e)))
print("Message: " + str(e.message))
messageCount += 1
time.sleep(0.5)
print("Publish thread terminated.")

# ThreadB runtime function:
# 1. Subscribe to topicB
# 2. Wait for a while
# 3. Create network blocking, triggering auto-reconnect and auto-resubscribe
# 4. On connect, wait for another while
def threadBRuntime(self, pyCoreClient, callback):
try:
# Subscribe to topicB
pyCoreClient.subscribe(self._topicB, 1, callback)
except subscribeTimeoutException:
print("Subscribe timeout!")
except subscribeError:
print("Subscribe error!")
except Exception as e:
print("Unknown exception!")
print("Type: " + str(type(e)))
print("Message: " + str(e.message))
# Wait to get the first 10 messages from thread A
time.sleep(10)
# Block the network for 3 sec
print("Block the network for 3 sec...")
blockingTimeTenMs = 300
while blockingTimeTenMs != 0:
manualNetworkError(pyCoreClient)
blockingTimeTenMs -= 1
time.sleep(0.01)
print("Leave it to the main thread to keep waiting...")


############################################################################
# Main #
# Check inputs
myCheckInManager = checkInManager.checkInManager(1)
myCheckInManager.verify(sys.argv)

host = "ajje7lpljulm4-ats.iot.us-east-1.amazonaws.com"
rootCA = "./test-integration/Credentials/rootCA.crt"
certificate = "./test-integration/Credentials/certificate.pem.crt"
privateKey = "./test-integration/Credentials/privateKey.pem.key"
mode = myCheckInManager.mode

skip_when_match(ModeIsALPN(mode).And(
Python2VersionLowerThan((2, 7, 10)).Or(Python3VersionLowerThan((3, 5, 0)))
), "This test is not applicable for mode %s and Python verison %s. Skipping..." % (mode, sys.version_info[:3]))

# Init Python core and connect
myMQTTClientManager = MQTTClientManager.MQTTClientManager()
clientPub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_PUB, host, rootCA,
certificate, privateKey, mode=mode)
clientSub = myMQTTClientManager.create_connected_mqtt_core(CLIENT_ID_SUB, host, rootCA,
certificate, privateKey, mode=mode)

if clientPub is None or clientSub is None:
print("Clients not init!")
exit(4)

print("Two clients are connected!")

# Configurations
################
# Callback unit
subCallbackUnit = callbackUnit()
# Threads
mySimpleThreadManager = simpleThreadManager.simpleThreadManager()
myRunFunctionUnit = runFunctionUnit()
publishThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnit.threadARuntime, [clientPub])
subscribeThreadID = mySimpleThreadManager.createOneTimeThread(myRunFunctionUnit.threadBRuntime,
[clientSub, subCallbackUnit.messageCallback])

# Performing
############
mySimpleThreadManager.startThreadWithID(subscribeThreadID)
mySimpleThreadManager.startThreadWithID(publishThreadID)
mySimpleThreadManager.joinOneTimeThreadWithID(subscribeThreadID)
mySimpleThreadManager.joinOneTimeThreadWithID(publishThreadID)
time.sleep(3) # Just in case messages arrive slowly

# Verifying
###########
# Length
print("Check if the length of the two sets are equal...")
print("Received from subscription: " + str(len(subCallbackUnit.getInternalSet())))
print("Sent through publishes: " + str(len(myRunFunctionUnit._messagesPublished)))
if len(myRunFunctionUnit._messagesPublished) != len(subCallbackUnit.getInternalSet()):
print("Number of messages not equal!")
exit(4)
# Content
print("Check if the content if the two sets are equivalent...")
if myRunFunctionUnit._messagesPublished != subCallbackUnit.getInternalSet():
print("Sent through publishes:")
print(myRunFunctionUnit._messagesPublished)
print("Received from subscription:")
print(subCallbackUnit.getInternalSet())
print("Set content not equal!")
exit(4)
else:
print("Yes!")
Loading

0 comments on commit b5ba6fe

Please sign in to comment.