diff --git a/AWSIoTPythonSDK/MQTTLib.py b/AWSIoTPythonSDK/MQTTLib.py index c4e9269..cb01bc0 100755 --- a/AWSIoTPythonSDK/MQTTLib.py +++ b/AWSIoTPythonSDK/MQTTLib.py @@ -17,10 +17,12 @@ from AWSIoTPythonSDK.core.util.providers import CertificateCredentialsProvider from AWSIoTPythonSDK.core.util.providers import IAMCredentialsProvider from AWSIoTPythonSDK.core.util.providers import EndpointProvider +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType from AWSIoTPythonSDK.core.protocol.mqtt_core import MqttCore import AWSIoTPythonSDK.core.shadow.shadowManager as shadowManager import AWSIoTPythonSDK.core.shadow.deviceShadow as deviceShadow - +import AWSIoTPythonSDK.core.jobs.thingJobManager as thingJobManager # Constants # - Protocol types: @@ -30,7 +32,6 @@ DROP_OLDEST = 0 DROP_NEWEST = 1 - class AWSIoTMQTTClient: def __init__(self, clientID, protocolType=MQTTv3_1_1, useWebsocket=False, cleanSession=True): @@ -154,7 +155,8 @@ def configureEndpoint(self, hostName, portNumber): *hostName* - String that denotes the host name of the user-specific AWS IoT endpoint. *portNumber* - Integer that denotes the port number to connect to. Could be :code:`8883` for - TLSv1.2 Mutual Authentication or :code:`443` for Websocket SigV4. + TLSv1.2 Mutual Authentication or :code:`443` for Websocket SigV4 and TLSv1.2 Mutual Authentication + with ALPN extension. **Returns** @@ -165,6 +167,8 @@ def configureEndpoint(self, hostName, portNumber): endpoint_provider.set_host(hostName) endpoint_provider.set_port(portNumber) self._mqtt_core.configure_endpoint(endpoint_provider) + if portNumber == 443 and not self._mqtt_core.use_wss(): + self._mqtt_core.configure_alpn_protocols() def configureIAMCredentials(self, AWSAccessKeyID, AWSSecretAccessKey, AWSSessionToken=""): """ @@ -650,7 +654,7 @@ def subscribe(self, topic, QoS, callback): *QoS* - Quality of Service. Could be 0 or 1. - *callback* - Function to be called when a new message for the subscribed topic + *callback* - Function to be called when a new message for the subscribed topic comes in. Should be in form :code:`customCallback(client, userdata, message)`, where :code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are here just to be aligned with the underneath Paho callback function signature. These fields are pending to be @@ -828,40 +832,19 @@ def onMessage(self, message): """ pass +class _AWSIoTMQTTDelegatingClient(object): -class AWSIoTMQTTShadowClient: - - def __init__(self, clientID, protocolType=MQTTv3_1_1, useWebsocket=False, cleanSession=True): + def __init__(self, clientID, protocolType=MQTTv3_1_1, useWebsocket=False, cleanSession=True, awsIoTMQTTClient=None): """ - The client class that manages device shadow and accesses its functionality in AWS IoT over MQTT v3.1/3.1.1. - - It is built on top of the AWS IoT MQTT Client and exposes devive shadow related operations. - It shares the same connection types, synchronous MQTT operations and partial on-top features - with the AWS IoT MQTT Client: - - - Auto reconnect/resubscribe - - Same as AWS IoT MQTT Client. - - - Progressive reconnect backoff - - Same as AWS IoT MQTT Client. + This class is used internally by the SDK and should not be instantiated directly. - - Offline publish requests queueing with draining - - Disabled by default. Queueing is not allowed for time-sensitive shadow requests/messages. + It delegates to a provided AWS IoT MQTT Client or creates a new one given the configuration + parameters and exposes core operations for subclasses provide convenience methods **Syntax** - .. code:: python - - import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT - - # Create an AWS IoT MQTT Shadow Client using TLSv1.2 Mutual Authentication - myAWSIoTMQTTShadowClient = AWSIoTPyMQTT.AWSIoTMQTTShadowClient("testIoTPySDK") - # Create an AWS IoT MQTT Shadow Client using Websocket SigV4 - myAWSIoTMQTTShadowClient = AWSIoTPyMQTT.AWSIoTMQTTShadowClient("testIoTPySDK", useWebsocket=True) + None **Parameters** @@ -875,29 +858,26 @@ def __init__(self, clientID, protocolType=MQTTv3_1_1, useWebsocket=False, cleanS **Returns** - AWSIoTPythonSDK.MQTTLib.AWSIoTMQTTShadowClient object + AWSIoTPythonSDK.MQTTLib._AWSIoTMQTTDelegatingClient object """ # AWSIOTMQTTClient instance - self._AWSIoTMQTTClient = AWSIoTMQTTClient(clientID, protocolType, useWebsocket, cleanSession) - # Configure it to disable offline Publish Queueing - self._AWSIoTMQTTClient.configureOfflinePublishQueueing(0) # Disable queueing, no queueing for time-sensitive shadow messages - self._AWSIoTMQTTClient.configureDrainingFrequency(10) - # Now retrieve the configured mqttCore and init a shadowManager instance - self._shadowManager = shadowManager.shadowManager(self._AWSIoTMQTTClient._mqtt_core) + self._AWSIoTMQTTClient = awsIoTMQTTClient if awsIoTMQTTClient is not None else AWSIoTMQTTClient(clientID, protocolType, useWebsocket, cleanSession) # Configuration APIs def configureLastWill(self, topic, payload, QoS): """ **Description** - Used to configure the last will topic, payload and QoS of the client. Should be called before connect. + Used to configure the last will topic, payload and QoS of the client. Should be called before connect. This is a public + facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTMQTTClient.configureLastWill("last/Will/Topic", "lastWillPayload", 0) + myShadowClient.configureLastWill("last/Will/Topic", "lastWillPayload", 0) + myJobsClient.configureLastWill("last/Will/Topic", "lastWillPayload", 0) **Parameters** @@ -919,13 +899,15 @@ def clearLastWill(self): """ **Description** - Used to clear the last will configuration that is previously set through configureLastWill. + Used to clear the last will configuration that is previously set through configureLastWill. This is a public + facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTShadowMQTTClient.clearLastWill() + myShadowClient.clearLastWill() + myJobsClient.clearLastWill() **Parameter** @@ -944,20 +926,22 @@ def configureEndpoint(self, hostName, portNumber): **Description** Used to configure the host name and port number the underneath AWS IoT MQTT Client tries to connect to. Should be called - before connect. + before connect. This is a public facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTMQTTShadowClient.configureEndpoint("random.iot.region.amazonaws.com", 8883) + myShadowClient.clearLastWill("random.iot.region.amazonaws.com", 8883) + myJobsClient.clearLastWill("random.iot.region.amazonaws.com", 8883) **Parameters** *hostName* - String that denotes the host name of the user-specific AWS IoT endpoint. *portNumber* - Integer that denotes the port number to connect to. Could be :code:`8883` for - TLSv1.2 Mutual Authentication or :code:`443` for Websocket SigV4. + TLSv1.2 Mutual Authentication or :code:`443` for Websocket SigV4 and TLSv1.2 Mutual Authentication + with ALPN extension. **Returns** @@ -972,13 +956,15 @@ def configureIAMCredentials(self, AWSAccessKeyID, AWSSecretAccessKey, AWSSTSToke **Description** Used to configure/update the custom IAM credentials for the underneath AWS IoT MQTT Client - for Websocket SigV4 connection to AWS IoT. Should be called before connect. + for Websocket SigV4 connection to AWS IoT. Should be called before connect. This is a public + facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTMQTTShadowClient.configureIAMCredentials(obtainedAccessKeyID, obtainedSecretAccessKey, obtainedSessionToken) + myShadowClient.clearLastWill(obtainedAccessKeyID, obtainedSecretAccessKey, obtainedSessionToken) + myJobsClient.clearLastWill(obtainedAccessKeyID, obtainedSecretAccessKey, obtainedSessionToken) .. note:: @@ -1005,13 +991,15 @@ def configureCredentials(self, CAFilePath, KeyPath="", CertificatePath=""): # S """ **Description** - Used to configure the rootCA, private key and certificate files. Should be called before connect. + Used to configure the rootCA, private key and certificate files. Should be called before connect. This is a public + facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTMQTTClient.configureCredentials("PATH/TO/ROOT_CA", "PATH/TO/PRIVATE_KEY", "PATH/TO/CERTIFICATE") + myShadowClient.clearLastWill("PATH/TO/ROOT_CA", "PATH/TO/PRIVATE_KEY", "PATH/TO/CERTIFICATE") + myJobsClient.clearLastWill("PATH/TO/ROOT_CA", "PATH/TO/PRIVATE_KEY", "PATH/TO/CERTIFICATE") **Parameters** @@ -1033,7 +1021,8 @@ def configureAutoReconnectBackoffTime(self, baseReconnectQuietTimeSecond, maxRec """ **Description** - Used to configure the auto-reconnect backoff timing. Should be called before connect. + Used to configure the auto-reconnect backoff timing. Should be called before connect. This is a public + facing API inherited by application level public clients. **Syntax** @@ -1041,7 +1030,8 @@ def configureAutoReconnectBackoffTime(self, baseReconnectQuietTimeSecond, maxRec # Configure the auto-reconnect backoff to start with 1 second and use 128 seconds as a maximum back off time. # Connection over 20 seconds is considered stable and will reset the back off time back to its base. - myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 128, 20) + myShadowClient.clearLastWill(1, 128, 20) + myJobsClient.clearLastWill(1, 128, 20) **Parameters** @@ -1066,14 +1056,15 @@ def configureConnectDisconnectTimeout(self, timeoutSecond): **Description** Used to configure the time in seconds to wait for a CONNACK or a disconnect to complete. - Should be called before connect. + Should be called before connect. This is a public facing API inherited by application level public clients. **Syntax** .. code:: python # Configure connect/disconnect timeout to be 10 seconds - myAWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(10) + myShadowClient.configureConnectDisconnectTimeout(10) + myJobsClient.configureConnectDisconnectTimeout(10) **Parameters** @@ -1092,14 +1083,15 @@ def configureMQTTOperationTimeout(self, timeoutSecond): **Description** Used to configure the timeout in seconds for MQTT QoS 1 publish, subscribe and unsubscribe. - Should be called before connect. + Should be called before connect. This is a public facing API inherited by application level public clients. **Syntax** .. code:: python # Configure MQTT operation timeout to be 5 seconds - myAWSIoTMQTTShadowClient.configureMQTTOperationTimeout(5) + myShadowClient.configureMQTTOperationTimeout(5) + myJobsClient.configureMQTTOperationTimeout(5) **Parameters** @@ -1117,14 +1109,16 @@ def configureUsernamePassword(self, username, password=None): """ **Description** - Used to configure the username and password used in CONNECT packet. + Used to configure the username and password used in CONNECT packet. This is a public facing API + inherited by application level public clients. **Syntax** .. code:: python # Configure user name and password - myAWSIoTMQTTShadowClient.configureUsernamePassword("myUsername", "myPassword") + myShadowClient.configureUsernamePassword("myUsername", "myPassword") + myJobsClient.configureUsernamePassword("myUsername", "myPassword") **Parameters** @@ -1145,12 +1139,14 @@ def enableMetricsCollection(self): Used to enable SDK metrics collection. Username field in CONNECT packet will be used to append the SDK name and SDK version in use and communicate to AWS IoT cloud. This metrics collection is enabled by default. + This is a public facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTMQTTClient.enableMetricsCollection() + myShadowClient.enableMetricsCollection() + myJobsClient.enableMetricsCollection() **Parameters** @@ -1167,13 +1163,14 @@ def disableMetricsCollection(self): """ **Description** - Used to disable SDK metrics collection. + Used to disable SDK metrics collection. This is a public facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTMQTTClient.disableMetricsCollection() + myShadowClient.disableMetricsCollection() + myJobsClient.disableMetricsCollection() **Parameters** @@ -1191,16 +1188,19 @@ def connect(self, keepAliveIntervalSecond=600): """ **Description** - Connect to AWS IoT, with user-specific keepalive interval configuration. + Connect to AWS IoT, with user-specific keepalive interval configuration. This is a public facing API inherited + by application level public clients. **Syntax** .. code:: python # Connect to AWS IoT with default keepalive set to 600 seconds - myAWSIoTMQTTShadowClient.connect() + myShadowClient.connect() + myJobsClient.connect() # Connect to AWS IoT with keepalive interval set to 1200 seconds - myAWSIoTMQTTShadowClient.connect(1200) + myShadowClient.connect(1200) + myJobsClient.connect(1200) **Parameters** @@ -1224,13 +1224,14 @@ def disconnect(self): """ **Description** - Disconnect from AWS IoT. + Disconnect from AWS IoT. This is a public facing API inherited by application level public clients. **Syntax** .. code:: python - myAWSIoTMQTTShadowClient.disconnect() + myShadowClient.disconnect() + myJobsClient.disconnect() **Parameters** @@ -1243,6 +1244,149 @@ def disconnect(self): """ return self._AWSIoTMQTTClient.disconnect() + # MQTT connection management API + def getMQTTConnection(self): + """ + **Description** + + Retrieve the AWS IoT MQTT Client used underneath, making it possible to perform + plain MQTT operations along with specialized operations using the same single connection. + This is a public facing API inherited by application level public clients. + + **Syntax** + + .. code:: python + + # Retrieve the AWS IoT MQTT Client used in the AWS IoT MQTT Delegating Client + thisAWSIoTMQTTClient = myShadowClient.getMQTTConnection() + thisAWSIoTMQTTClient = myJobsClient.getMQTTConnection() + # Perform plain MQTT operations using the same connection + thisAWSIoTMQTTClient.publish("Topic", "Payload", 1) + ... + + **Parameters** + + None + + **Returns** + + AWSIoTPythonSDK.MQTTLib.AWSIoTMQTTClient object + + """ + # Return the internal AWSIoTMQTTClient instance + return self._AWSIoTMQTTClient + + def onOnline(self): + """ + **Description** + + Callback that gets called when the client is online. The callback registration should happen before calling + connect. This is a public facing API inherited by application level public clients. + + **Syntax** + + .. code:: python + + # Register an onOnline callback + myShadowClient.onOnline = myOnOnlineCallback + myJobsClient.onOnline = myOnOnlineCallback + + **Parameters** + + None + + **Returns** + + None + + """ + pass + + def onOffline(self): + """ + **Description** + + Callback that gets called when the client is offline. The callback registration should happen before calling + connect. This is a public facing API inherited by application level public clients. + + **Syntax** + + .. code:: python + + # Register an onOffline callback + myShadowClient.onOffline = myOnOfflineCallback + myJobsClient.onOffline = myOnOfflineCallback + + **Parameters** + + None + + **Returns** + + None + + """ + pass + + +class AWSIoTMQTTShadowClient(_AWSIoTMQTTDelegatingClient): + + def __init__(self, clientID, protocolType=MQTTv3_1_1, useWebsocket=False, cleanSession=True, awsIoTMQTTClient=None): + """ + + The client class that manages device shadow and accesses its functionality in AWS IoT over MQTT v3.1/3.1.1. + + It delegates to the AWS IoT MQTT Client and exposes devive shadow related operations. + It shares the same connection types, synchronous MQTT operations and partial on-top features + with the AWS IoT MQTT Client: + + - Auto reconnect/resubscribe + + Same as AWS IoT MQTT Client. + + - Progressive reconnect backoff + + Same as AWS IoT MQTT Client. + + - Offline publish requests queueing with draining + + Disabled by default. Queueing is not allowed for time-sensitive shadow requests/messages. + + **Syntax** + + .. code:: python + + import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT + + # Create an AWS IoT MQTT Shadow Client using TLSv1.2 Mutual Authentication + myAWSIoTMQTTShadowClient = AWSIoTPyMQTT.AWSIoTMQTTShadowClient("testIoTPySDK") + # Create an AWS IoT MQTT Shadow Client using Websocket SigV4 + myAWSIoTMQTTShadowClient = AWSIoTPyMQTT.AWSIoTMQTTShadowClient("testIoTPySDK", useWebsocket=True) + + **Parameters** + + *clientID* - String that denotes the client identifier used to connect to AWS IoT. + If empty string were provided, client id for this connection will be randomly generated + n server side. + + *protocolType* - MQTT version in use for this connection. Could be :code:`AWSIoTPythonSDK.MQTTLib.MQTTv3_1` or :code:`AWSIoTPythonSDK.MQTTLib.MQTTv3_1_1` + + *useWebsocket* - Boolean that denotes enabling MQTT over Websocket SigV4 or not. + + **Returns** + + AWSIoTPythonSDK.MQTTLib.AWSIoTMQTTShadowClient object + + """ + super(AWSIoTMQTTShadowClient, self).__init__(clientID, protocolType, useWebsocket, cleanSession, awsIoTMQTTClient) + #leave passed in clients alone + if awsIoTMQTTClient is None: + # Configure it to disable offline Publish Queueing + self._AWSIoTMQTTClient.configureOfflinePublishQueueing(0) # Disable queueing, no queueing for time-sensitive shadow messages + self._AWSIoTMQTTClient.configureDrainingFrequency(10) + # Now retrieve the configured mqttCore and init a shadowManager instance + self._shadowManager = shadowManager.shadowManager(self._AWSIoTMQTTClient._mqtt_core) + # Shadow management API def createShadowHandlerWithName(self, shadowName, isPersistentSubscribe): """ @@ -1263,15 +1407,15 @@ def createShadowHandlerWithName(self, shadowName, isPersistentSubscribe): *shadowName* - Name of the device shadow. - *isPersistentSubscribe* - Whether to unsubscribe from shadow response (accepted/rejected) topics - when there is a response. Will subscribe at the first time the shadow request is made and will + *isPersistentSubscribe* - Whether to unsubscribe from shadow response (accepted/rejected) topics + when there is a response. Will subscribe at the first time the shadow request is made and will not unsubscribe if isPersistentSubscribe is set. **Returns** AWSIoTPythonSDK.core.shadow.deviceShadow.deviceShadow object, which exposes the device shadow interface. - """ + """ # Create and return a deviceShadow instance return deviceShadow.deviceShadow(shadowName, isPersistentSubscribe, self._shadowManager) # Shadow APIs are accessible in deviceShadow instance": @@ -1282,82 +1426,289 @@ def createShadowHandlerWithName(self, shadowName, isPersistentSubscribe): # deviceShadow.shadowRegisterDelta # deviceShadow.shadowUnregisterDelta - # MQTT connection management API - def getMQTTConnection(self): +class AWSIoTMQTTThingJobsClient(_AWSIoTMQTTDelegatingClient): + + def __init__(self, clientID, thingName, QoS=0, protocolType=MQTTv3_1_1, useWebsocket=False, cleanSession=True, awsIoTMQTTClient=None): + """ + + The client class that specializes in handling jobs messages and accesses its functionality in AWS IoT over MQTT v3.1/3.1.1. + + It delegates to the AWS IoT MQTT Client and exposes jobs related operations. + It shares the same connection types, synchronous MQTT operations and partial on-top features + with the AWS IoT MQTT Client: + + - Auto reconnect/resubscribe + + Same as AWS IoT MQTT Client. + + - Progressive reconnect backoff + + Same as AWS IoT MQTT Client. + + - Offline publish requests queueing with draining + + Same as AWS IoT MQTT Client + + **Syntax** + + .. code:: python + + import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT + + # Create an AWS IoT MQTT Jobs Client using TLSv1.2 Mutual Authentication + myAWSIoTMQTTJobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("testIoTPySDK") + # Create an AWS IoT MQTT Jobs Client using Websocket SigV4 + myAWSIoTMQTTJobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("testIoTPySDK", useWebsocket=True) + + **Parameters** + + *clientID* - String that denotes the client identifier and client token for jobs requests + If empty string is provided, client id for this connection will be randomly generated + on server side. If an awsIotMQTTClient is specified, this will not override the client ID + for the existing MQTT connection and only impact the client token for jobs request payloads + + *thingName* - String that represents the thingName used to send requests to proper topics and subscribe + to proper topics. + + *QoS* - QoS used for all requests sent through this client + + *awsIoTMQTTClient* - An instance of AWSIoTMQTTClient to use if not None. If not None, clientID, protocolType, useWebSocket, + and cleanSession parameters are not used. Caller is expected to invoke connect() prior to calling the pub/sub methods on this client. + + *protocolType* - MQTT version in use for this connection. Could be :code:`AWSIoTPythonSDK.MQTTLib.MQTTv3_1` or :code:`AWSIoTPythonSDK.MQTTLib.MQTTv3_1_1` + + *useWebsocket* - Boolean that denotes enabling MQTT over Websocket SigV4 or not. + + **Returns** + + AWSIoTPythonSDK.MQTTLib.AWSIoTMQTTJobsClient object + + """ + # AWSIOTMQTTClient instance + super(AWSIoTMQTTThingJobsClient, self).__init__(clientID, protocolType, useWebsocket, cleanSession, awsIoTMQTTClient) + self._thingJobManager = thingJobManager.thingJobManager(thingName, clientID) + self._QoS = QoS + + def createJobSubscription(self, callback, jobExecutionType=jobExecutionTopicType.JOB_WILDCARD_TOPIC, jobReplyType=jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId=None): """ **Description** - Retrieve the AWS IoT MQTT Client used underneath for shadow operations, making it possible to perform - plain MQTT operations along with shadow operations using the same single connection. + Synchronously creates an MQTT subscription to a jobs related topic based on the provided arguments **Syntax** .. code:: python + #Subscribe to notify-next topic to monitor change in job referred to by $next + myAWSIoTMQTTJobsClient.createJobSubscription(callback, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) + #Subscribe to notify topic to monitor changes to jobs in pending list + myAWSIoTMQTTJobsClient.createJobSubscription(callback, jobExecutionTopicType.JOB_NOTIFY_TOPIC) + #Subscribe to recieve messages for job execution updates + myAWSIoTMQTTJobsClient.createJobSubscription(callback, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE) + #Subscribe to recieve messages for describing a job execution + myAWSIoTMQTTJobsClient.createJobSubscription(callback, jobExecutionTopicType.JOB_DESCRIBE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, jobId) - # Retrieve the AWS IoT MQTT Client used in the AWS IoT MQTT Shadow Client - thisAWSIoTMQTTClient = myAWSIoTMQTTShadowClient.getMQTTConnection() - # Perform plain MQTT operations using the same connection - thisAWSIoTMQTTClient.publish("Topic", "Payload", 1) - ... + **Parameters** + + *callback* - Function to be called when a new message for the subscribed job topic + comes in. Should be in form :code:`customCallback(client, userdata, message)`, where + :code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are + here just to be aligned with the underneath Paho callback function signature. These fields are pending to be + deprecated and should not be depended on. + + *jobExecutionType* - Member of the jobExecutionTopicType class specifying the jobs topic to subscribe to + Defaults to jobExecutionTopicType.JOB_WILDCARD_TOPIC + + *jobReplyType* - Member of the jobExecutionTopicReplyType class specifying the (optional) reply sub-topic to subscribe to + Defaults to jobExecutionTopicReplyType.JOB_REQUEST_TYPE which indicates the subscription isn't intended for a jobs reply topic + + *jobId* - JobId string if the topic type requires one. + Defaults to None + + **Returns** + + True if the subscribe attempt succeeded. False if failed. + + """ + topic = self._thingJobManager.getJobTopic(jobExecutionType, jobReplyType, jobId) + return self._AWSIoTMQTTClient.subscribe(topic, self._QoS, callback) + + def createJobSubscriptionAsync(self, ackCallback, callback, jobExecutionType=jobExecutionTopicType.JOB_WILDCARD_TOPIC, jobReplyType=jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId=None): + """ + **Description** + + Asynchronously creates an MQTT subscription to a jobs related topic based on the provided arguments + + **Syntax** + + .. code:: python + #Subscribe to notify-next topic to monitor change in job referred to by $next + myAWSIoTMQTTJobsClient.createJobSubscriptionAsync(callback, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) + #Subscribe to notify topic to monitor changes to jobs in pending list + myAWSIoTMQTTJobsClient.createJobSubscriptionAsync(callback, jobExecutionTopicType.JOB_NOTIFY_TOPIC) + #Subscribe to recieve messages for job execution updates + myAWSIoTMQTTJobsClient.createJobSubscriptionAsync(callback, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE) + #Subscribe to recieve messages for describing a job execution + myAWSIoTMQTTJobsClient.createJobSubscriptionAsync(callback, jobExecutionTopicType.JOB_DESCRIBE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, jobId) **Parameters** - None + *ackCallback* - Callback to be invoked when the client receives a SUBACK. Should be in form + :code:`customCallback(mid, data)`, where :code:`mid` is the packet id for the disconnect request and + :code:`data` is the granted QoS for this subscription. + + *callback* - Function to be called when a new message for the subscribed job topic + comes in. Should be in form :code:`customCallback(client, userdata, message)`, where + :code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are + here just to be aligned with the underneath Paho callback function signature. These fields are pending to be + deprecated and should not be depended on. + + *jobExecutionType* - Member of the jobExecutionTopicType class specifying the jobs topic to subscribe to + Defaults to jobExecutionTopicType.JOB_WILDCARD_TOPIC + + *jobReplyType* - Member of the jobExecutionTopicReplyType class specifying the (optional) reply sub-topic to subscribe to + Defaults to jobExecutionTopicReplyType.JOB_REQUEST_TYPE which indicates the subscription isn't intended for a jobs reply topic + + *jobId* - JobId of the topic if the topic type requires one. + Defaults to None **Returns** - AWSIoTPythonSDK.MQTTLib.AWSIoTMQTTClient object + Subscribe request packet id, for tracking purpose in the corresponding callback. - """ - # Return the internal AWSIoTMQTTClient instance - return self._AWSIoTMQTTClient + """ + topic = self._thingJobManager.getJobTopic(jobExecutionType, jobReplyType, jobId) + return self._AWSIoTMQTTClient.subscribeAsync(topic, self._QoS, ackCallback, callback) - def onOnline(self): + def sendJobsQuery(self, jobExecTopicType, jobId=None): """ **Description** - Callback that gets called when the client is online. The callback registration should happen before calling - connect. + Publishes an MQTT jobs related request for a potentially specific jobId (or wildcard) **Syntax** .. code:: python + #send a request to describe the next job + myAWSIoTMQTTJobsClient.sendJobsQuery(jobExecutionTopicType.JOB_DESCRIBE_TOPIC, '$next') + #send a request to get list of pending jobs + myAWSIoTMQTTJobsClient.sendJobsQuery(jobExecutionTopicType.JOB_GET_PENDING_TOPIC) - # Register an onOnline callback - myAWSIoTMQTTShadowClient.onOnline = myOnOnlineCallback + **Parameters** + + *jobExecutionType* - Member of the jobExecutionTopicType class that correlates the jobs topic to publish to + + *jobId* - JobId string if the topic type requires one. + Defaults to None + + **Returns** + + True if the publish request has been sent to paho. False if the request did not reach paho. + + """ + topic = self._thingJobManager.getJobTopic(jobExecTopicType, jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId) + payload = self._thingJobManager.serializeClientTokenPayload() + return self._AWSIoTMQTTClient.publish(topic, payload, self._QoS) + + def sendJobsStartNext(self, statusDetails=None): + """ + **Description** + + Publishes an MQTT message to the StartNextJobExecution topic. This will attempt to get the next pending + job execution and change its status to IN_PROGRESS. + + **Syntax** + + .. code:: python + #Start next job (set status to IN_PROGRESS) and update with optional statusDetails + myAWSIoTMQTTJobsClient.sendJobsStartNext({'StartedBy': 'myClientId'}) **Parameters** - None + *statusDetails* - Dictionary containing the key value pairs to use for the status details of the job execution **Returns** - None + True if the publish request has been sent to paho. False if the request did not reach paho. """ - pass + topic = self._thingJobManager.getJobTopic(jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REQUEST_TYPE) + payload = self._thingJobManager.serializeStartNextPendingJobExecutionPayload(statusDetails) + return self._AWSIoTMQTTClient.publish(topic, payload, self._QoS) - def onOffline(self): + def sendJobsUpdate(self, jobId, status, statusDetails=None, expectedVersion=0, executionNumber=0, includeJobExecutionState=False, includeJobDocument=False): """ **Description** - Callback that gets called when the client is offline. The callback registration should happen before calling - connect. + Publishes an MQTT message to a corresponding job execution specific topic to update its status according to the parameters. + Can be used to change a job from QUEUED to IN_PROGRESS to SUCEEDED or FAILED. **Syntax** .. code:: python + #Update job with id 'jobId123' to succeeded state, specifying new status details, with expectedVersion=1, executionNumber=2. + #For the response, include job execution state and not the job document + myAWSIoTMQTTJobsClient.sendJobsUpdate('jobId123', jobExecutionStatus.JOB_EXECUTION_SUCCEEDED, statusDetailsMap, 1, 2, True, False) - # Register an onOffline callback - myAWSIoTMQTTShadowClient.onOffline = myOnOfflineCallback + + #Update job with id 'jobId456' to failed state + myAWSIoTMQTTJobsClient.sendJobsUpdate('jobId456', jobExecutionStatus.JOB_EXECUTION_FAILED) **Parameters** - None + *jobId* - JobID String of the execution to update the status of + + *status* - job execution status to change the job execution to. Member of jobExecutionStatus + + *statusDetails* - new status details to set on the job execution + + *expectedVersion* - The expected current version of the job execution. IoT jobs increments expectedVersion each time you update the job execution. + If the version of the job execution stored in Jobs does not match, the update is rejected with a VersionMismatch error, and an ErrorResponse + that contains the current job execution status data is returned. (This makes it unnecessary to perform a separate DescribeJobExecution request + n order to obtain the job execution status data.) + + *executionNumber* - A number that identifies a particular job execution on a particular device. If not specified, the latest job execution is used. + + *includeJobExecutionState* - When included and set to True, the response contains the JobExecutionState field. The default is False. + + *includeJobDocument - When included and set to True, the response contains the JobDocument. The default is False. **Returns** - None + True if the publish request has been sent to paho. False if the request did not reach paho. """ - pass + topic = self._thingJobManager.getJobTopic(jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId) + payload = self._thingJobManager.serializeJobExecutionUpdatePayload(status, statusDetails, expectedVersion, executionNumber, includeJobExecutionState, includeJobDocument) + return self._AWSIoTMQTTClient.publish(topic, payload, self._QoS) + + def sendJobsDescribe(self, jobId, executionNumber=0, includeJobDocument=True): + """ + **Description** + + Publishes a method to the describe topic for a particular job. + + **Syntax** + + .. code:: python + #Describe job with id 'jobId1' of any executionNumber, job document will be included in response + myAWSIoTMQTTJobsClient.sendJobsDescribe('jobId1') + + #Describe job with id 'jobId2', with execution number of 2, and includeJobDocument in the response + myAWSIoTMQTTJobsClient.sendJobsDescribe('jobId2', 2, True) + + **Parameters** + + *jobId* - jobID to describe. This is allowed to be a wildcard such as '$next' + + *executionNumber* - A number that identifies a particular job execution on a particular device. If not specified, the latest job execution is used. + + *includeJobDocument* - When included and set to True, the response contains the JobDocument. + + **Returns** + + True if the publish request has been sent to paho. False if the request did not reach paho. + + """ + topic = self._thingJobManager.getJobTopic(jobExecutionTopicType.JOB_DESCRIBE_TOPIC, jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId) + payload = self._thingJobManager.serializeDescribeJobExecutionPayload(executionNumber, includeJobDocument) + return self._AWSIoTMQTTClient.publish(topic, payload, self._QoS) diff --git a/AWSIoTPythonSDK/__init__.py b/AWSIoTPythonSDK/__init__.py index 377fda1..6b8b97e 100755 --- a/AWSIoTPythonSDK/__init__.py +++ b/AWSIoTPythonSDK/__init__.py @@ -1,3 +1,3 @@ -__version__ = "1.3.1" +__version__ = "1.4.0" diff --git a/AWSIoTPythonSDK/core/greengrass/discovery/providers.py b/AWSIoTPythonSDK/core/greengrass/discovery/providers.py index eb72b90..0842ba1 100644 --- a/AWSIoTPythonSDK/core/greengrass/discovery/providers.py +++ b/AWSIoTPythonSDK/core/greengrass/discovery/providers.py @@ -39,7 +39,9 @@ class DiscoveryInfoProvider(object): REQUEST_TYPE_PREFIX = "GET " PAYLOAD_PREFIX = "/greengrass/discover/thing/" - PAYLOAD_SUFFIX = " HTTP/1.1\r\n\r\n" # Space in the front + PAYLOAD_SUFFIX = " HTTP/1.1\r\n" # Space in the front + HOST_PREFIX = "Host: " + HOST_SUFFIX = "\r\n\r\n" HTTP_PROTOCOL = r"HTTP/1.1 " CONTENT_LENGTH = r"content-length: " CONTENT_LENGTH_PATTERN = CONTENT_LENGTH + r"([0-9]+)\r\n" @@ -311,7 +313,10 @@ def _send_discovery_request(self, ssl_sock, thing_name): request = self.REQUEST_TYPE_PREFIX + \ self.PAYLOAD_PREFIX + \ thing_name + \ - self.PAYLOAD_SUFFIX + self.PAYLOAD_SUFFIX + \ + self.HOST_PREFIX + \ + self._host + ":" + str(self._port) + \ + self.HOST_SUFFIX self._logger.debug("Sending discover request: " + request) start_time = time.time() diff --git a/AWSIoTPythonSDK/core/jobs/__init__.py b/AWSIoTPythonSDK/core/jobs/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/AWSIoTPythonSDK/core/jobs/thingJobManager.py b/AWSIoTPythonSDK/core/jobs/thingJobManager.py new file mode 100755 index 0000000..0dd7290 --- /dev/null +++ b/AWSIoTPythonSDK/core/jobs/thingJobManager.py @@ -0,0 +1,151 @@ +# /* +# * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# * +# * Licensed under the Apache License, Version 2.0 (the "License"). +# * You may not use this file except in compliance with the License. +# * A copy of the License is located at +# * +# * http://aws.amazon.com/apache2.0 +# * +# * or in the "license" file accompanying this file. This file is distributed +# * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# * express or implied. See the License for the specific language governing +# * permissions and limitations under the License. +# */ + +import json + +_BASE_THINGS_TOPIC = "$aws/things/" +_NOTIFY_OPERATION = "notify" +_NOTIFY_NEXT_OPERATION = "notify-next" +_GET_OPERATION = "get" +_START_NEXT_OPERATION = "start-next" +_WILDCARD_OPERATION = "+" +_UPDATE_OPERATION = "update" +_ACCEPTED_REPLY = "accepted" +_REJECTED_REPLY = "rejected" +_WILDCARD_REPLY = "#" + +#Members of this enum are tuples +_JOB_ID_REQUIRED_INDEX = 1 +_JOB_OPERATION_INDEX = 2 + +_STATUS_KEY = 'status' +_STATUS_DETAILS_KEY = 'statusDetails' +_EXPECTED_VERSION_KEY = 'expectedVersion' +_EXEXCUTION_NUMBER_KEY = 'executionNumber' +_INCLUDE_JOB_EXECUTION_STATE_KEY = 'includeJobExecutionState' +_INCLUDE_JOB_DOCUMENT_KEY = 'includeJobDocument' +_CLIENT_TOKEN_KEY = 'clientToken' + +#The type of job topic. +class jobExecutionTopicType(object): + JOB_UNRECOGNIZED_TOPIC = (0, False, '') + JOB_GET_PENDING_TOPIC = (1, False, _GET_OPERATION) + JOB_START_NEXT_TOPIC = (2, False, _START_NEXT_OPERATION) + JOB_DESCRIBE_TOPIC = (3, True, _GET_OPERATION) + JOB_UPDATE_TOPIC = (4, True, _UPDATE_OPERATION) + JOB_NOTIFY_TOPIC = (5, False, _NOTIFY_OPERATION) + JOB_NOTIFY_NEXT_TOPIC = (6, False, _NOTIFY_NEXT_OPERATION) + JOB_WILDCARD_TOPIC = (7, False, _WILDCARD_OPERATION) + +#Members of this enum are tuples +_JOB_SUFFIX_INDEX = 1 +#The type of reply topic, or #JOB_REQUEST_TYPE for topics that are not replies. +class jobExecutionTopicReplyType(object): + JOB_UNRECOGNIZED_TOPIC_TYPE = (0, '') + JOB_REQUEST_TYPE = (1, '') + JOB_ACCEPTED_REPLY_TYPE = (2, '/' + _ACCEPTED_REPLY) + JOB_REJECTED_REPLY_TYPE = (3, '/' + _REJECTED_REPLY) + JOB_WILDCARD_REPLY_TYPE = (4, '/' + _WILDCARD_REPLY) + +_JOB_STATUS_INDEX = 1 +class jobExecutionStatus(object): + JOB_EXECUTION_STATUS_NOT_SET = (0, None) + JOB_EXECUTION_QUEUED = (1, 'QUEUED') + JOB_EXECUTION_IN_PROGRESS = (2, 'IN_PROGRESS') + JOB_EXECUTION_FAILED = (3, 'FAILED') + JOB_EXECUTION_SUCCEEDED = (4, 'SUCCEEDED') + JOB_EXECUTION_CANCELED = (5, 'CANCELED') + JOB_EXECUTION_REJECTED = (6, 'REJECTED') + JOB_EXECUTION_UNKNOWN_STATUS = (99, None) + +def _getExecutionStatus(jobStatus): + try: + return jobStatus[_JOB_STATUS_INDEX] + except KeyError: + return None + +def _isWithoutJobIdTopicType(srcJobExecTopicType): + return (srcJobExecTopicType == jobExecutionTopicType.JOB_GET_PENDING_TOPIC or srcJobExecTopicType == jobExecutionTopicType.JOB_START_NEXT_TOPIC + or srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_TOPIC or srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) + +class thingJobManager: + def __init__(self, thingName, clientToken = None): + self._thingName = thingName + self._clientToken = clientToken + + def getJobTopic(self, srcJobExecTopicType, srcJobExecTopicReplyType=jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId=None): + if self._thingName is None: + return None + + #Verify topics that only support request type, actually have request type specified for reply + if (srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_TOPIC or srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) and srcJobExecTopicReplyType != jobExecutionTopicReplyType.JOB_REQUEST_TYPE: + return None + + #Verify topics that explicitly do not want a job ID do not have one specified + if (jobId is not None and _isWithoutJobIdTopicType(srcJobExecTopicType)): + return None + + #Verify job ID is present if the topic requires one + if jobId is None and srcJobExecTopicType[_JOB_ID_REQUIRED_INDEX]: + return None + + #Ensure the job operation is a non-empty string + if srcJobExecTopicType[_JOB_OPERATION_INDEX] == '': + return None + + if srcJobExecTopicType[_JOB_ID_REQUIRED_INDEX]: + return '{0}{1}/jobs/{2}/{3}{4}'.format(_BASE_THINGS_TOPIC, self._thingName, str(jobId), srcJobExecTopicType[_JOB_OPERATION_INDEX], srcJobExecTopicReplyType[_JOB_SUFFIX_INDEX]) + elif srcJobExecTopicType == jobExecutionTopicType.JOB_WILDCARD_TOPIC: + return '{0}{1}/jobs/#'.format(_BASE_THINGS_TOPIC, self._thingName) + else: + return '{0}{1}/jobs/{2}{3}'.format(_BASE_THINGS_TOPIC, self._thingName, srcJobExecTopicType[_JOB_OPERATION_INDEX], srcJobExecTopicReplyType[_JOB_SUFFIX_INDEX]) + + def serializeJobExecutionUpdatePayload(self, status, statusDetails=None, expectedVersion=0, executionNumber=0, includeJobExecutionState=False, includeJobDocument=False): + executionStatus = _getExecutionStatus(status) + if executionStatus is None: + return None + payload = {_STATUS_KEY: executionStatus} + if statusDetails: + payload[_STATUS_DETAILS_KEY] = statusDetails + if expectedVersion > 0: + payload[_EXPECTED_VERSION_KEY] = str(expectedVersion) + if executionNumber > 0: + payload[_EXEXCUTION_NUMBER_KEY] = str(executionNumber) + if includeJobExecutionState: + payload[_INCLUDE_JOB_EXECUTION_STATE_KEY] = True + if includeJobDocument: + payload[_INCLUDE_JOB_DOCUMENT_KEY] = True + if self._clientToken is not None: + payload[_CLIENT_TOKEN_KEY] = self._clientToken + return json.dumps(payload) + + def serializeDescribeJobExecutionPayload(self, executionNumber=0, includeJobDocument=True): + payload = {_INCLUDE_JOB_DOCUMENT_KEY: includeJobDocument} + if executionNumber > 0: + payload[_EXEXCUTION_NUMBER_KEY] = executionNumber + if self._clientToken is not None: + payload[_CLIENT_TOKEN_KEY] = self._clientToken + return json.dumps(payload) + + def serializeStartNextPendingJobExecutionPayload(self, statusDetails=None): + payload = {} + if self._clientToken is not None: + payload[_CLIENT_TOKEN_KEY] = self._clientToken + if statusDetails is not None: + payload[_STATUS_DETAILS_KEY] = statusDetails + return json.dumps(payload) + + def serializeClientTokenPayload(self): + return json.dumps({_CLIENT_TOKEN_KEY: self._clientToken}) if self._clientToken is not None else '{}' diff --git a/AWSIoTPythonSDK/core/protocol/connection/alpn.py b/AWSIoTPythonSDK/core/protocol/connection/alpn.py new file mode 100644 index 0000000..b7d5137 --- /dev/null +++ b/AWSIoTPythonSDK/core/protocol/connection/alpn.py @@ -0,0 +1,67 @@ +# /* +# * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# * +# * Licensed under the Apache License, Version 2.0 (the "License"). +# * You may not use this file except in compliance with the License. +# * A copy of the License is located at +# * +# * http://aws.amazon.com/apache2.0 +# * +# * or in the "license" file accompanying this file. This file is distributed +# * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# * express or implied. See the License for the specific language governing +# * permissions and limitations under the License. +# */ + + +try: + import ssl +except: + ssl = None + + +class SSLContextBuilder(object): + + def __init__(self): + self.check_supportability() + self._ssl_context = ssl.create_default_context() + + def check_supportability(self): + if ssl is None: + raise RuntimeError("This platform has no SSL/TLS.") + if not hasattr(ssl, "SSLContext"): + raise NotImplementedError("This platform does not support SSLContext. Python 2.7.10+/3.5+ is required.") + if not hasattr(ssl.SSLContext, "set_alpn_protocols"): + raise NotImplementedError("This platform does not support ALPN as TLS extensions. Python 2.7.10+/3.5+ is required.") + + def with_protocol(self, protocol): + self._ssl_context.protocol = protocol + return self + + def with_ca_certs(self, ca_certs): + self._ssl_context.load_verify_locations(ca_certs) + return self + + def with_cert_key_pair(self, cert_file, key_file): + self._ssl_context.load_cert_chain(cert_file, key_file) + return self + + def with_cert_reqs(self, cert_reqs): + self._ssl_context.verify_mode = cert_reqs + return self + + def with_check_hostname(self, check_hostname): + self._ssl_context.check_hostname = check_hostname + return self + + def with_ciphers(self, ciphers): + if ciphers is not None: + self._ssl_context.set_ciphers(ciphers) # set_ciphers() does not allow None input. Use default (do nothing) if None + return self + + def with_alpn_protocols(self, alpn_protocols): + self._ssl_context.set_alpn_protocols(alpn_protocols) + return self + + def build(self): + return self._ssl_context diff --git a/AWSIoTPythonSDK/core/protocol/connection/cores.py b/AWSIoTPythonSDK/core/protocol/connection/cores.py index 71edaf3..a431d24 100644 --- a/AWSIoTPythonSDK/core/protocol/connection/cores.py +++ b/AWSIoTPythonSDK/core/protocol/connection/cores.py @@ -18,8 +18,10 @@ # when to increase it and when to reset it. +import re import sys import ssl +import errno import struct import socket import base64 @@ -30,6 +32,7 @@ from datetime import datetime import hashlib import hmac +from AWSIoTPythonSDK.exception.AWSIoTExceptions import ClientError from AWSIoTPythonSDK.exception.AWSIoTExceptions import wssNoKeyInEnvironmentError from AWSIoTPythonSDK.exception.AWSIoTExceptions import wssHandShakeError from AWSIoTPythonSDK.core.protocol.internal.defaults import DEFAULT_CONNECT_DISCONNECT_TIMEOUT_SEC @@ -240,12 +243,13 @@ def createWebsocketEndpoint(self, host, port, region, method, awsServiceName, pa amazonDateSimple = amazonDate[0] # Unicode in 3.x amazonDateComplex = amazonDate[1] # Unicode in 3.x allKeys = self._checkIAMCredentials() # Unicode in 3.x - hasCredentialsNecessaryForWebsocket = "aws_access_key_id" in allKeys.keys() and "aws_secret_access_key" in allKeys.keys() - if not hasCredentialsNecessaryForWebsocket: - return "" + if not self._hasCredentialsNecessaryForWebsocket(allKeys): + raise wssNoKeyInEnvironmentError() else: + # Because of self._hasCredentialsNecessaryForWebsocket(...), keyID and secretKey should not be None from here keyID = allKeys["aws_access_key_id"] secretKey = allKeys["aws_secret_access_key"] + # amazonDateSimple and amazonDateComplex are guaranteed not to be None queryParameters = "X-Amz-Algorithm=AWS4-HMAC-SHA256" + \ "&X-Amz-Credential=" + keyID + "%2F" + amazonDateSimple + "%2F" + region + "%2F" + awsServiceName + "%2Faws4_request" + \ "&X-Amz-Date=" + amazonDateComplex + \ @@ -264,12 +268,23 @@ def createWebsocketEndpoint(self, host, port, region, method, awsServiceName, pa # generate url url = "wss://" + host + ":" + str(port) + path + '?' + queryParameters + "&X-Amz-Signature=" + signature # See if we have STS token, if we do, add it - if "aws_session_token" in allKeys.keys(): + awsSessionTokenCandidate = allKeys.get("aws_session_token") + if awsSessionTokenCandidate is not None and len(awsSessionTokenCandidate) != 0: aws_session_token = allKeys["aws_session_token"] url += "&X-Amz-Security-Token=" + quote(aws_session_token.encode("utf-8")) # Unicode in 3.x self._logger.debug("createWebsocketEndpoint: Websocket URL: " + url) return url + def _hasCredentialsNecessaryForWebsocket(self, allKeys): + awsAccessKeyIdCandidate = allKeys.get("aws_access_key_id") + awsSecretAccessKeyCandidate = allKeys.get("aws_secret_access_key") + # None value is NOT considered as valid entries + validEntries = awsAccessKeyIdCandidate is not None and awsAccessKeyIdCandidate is not None + if validEntries: + # Empty value is NOT considered as valid entries + validEntries &= (len(awsAccessKeyIdCandidate) != 0 and len(awsSecretAccessKeyCandidate) != 0) + return validEntries + # This is an internal class that buffers the incoming bytes into an # internal buffer until it gets the full desired length of bytes. @@ -305,6 +320,10 @@ def read(self, numberOfBytesToBeBuffered): while self._remainedLength > 0: # Read in a loop, always try to read in the remained length # If the data is temporarily not available, socket.error will be raised and catched by paho dataChunk = self._sslSocket.read(self._remainedLength) + # There is a chance where the server terminates the connection without closing the socket. + # If that happens, let's raise an exception and enter the reconnect flow. + if not dataChunk: + raise socket.error(errno.ECONNABORTED, 0) self._internalBuffer.extend(dataChunk) # Buffer the data self._remainedLength -= len(dataChunk) # Update the remained length @@ -411,6 +430,8 @@ def __init__(self, socket, hostAddress, portNumber, AWSAccessKeyID="", AWSSecret raise ValueError("No Access Key/KeyID Error") except wssHandShakeError: raise ValueError("Websocket Handshake Error") + except ClientError as e: + raise ValueError(e.message) # Now we have a socket with secured websocket... self._bufferedReader = _BufferedReader(self._sslSocket) self._bufferedWriter = _BufferedWriter(self._sslSocket) @@ -461,11 +482,12 @@ def _verifyWSSAcceptKey(self, srcAcceptKey, clientKey): def _handShake(self, hostAddress, portNumber): CRLF = "\r\n" - hostAddressChunks = hostAddress.split('.') # .iot..amazonaws.com - region = hostAddressChunks[2] # XXXX..beta + IOT_ENDPOINT_PATTERN = r"^[0-9a-zA-Z]+\.iot\.(.*)\.amazonaws\..*" + matched = re.compile(IOT_ENDPOINT_PATTERN).match(hostAddress) + if not matched: + raise ClientError("Invalid endpoint pattern for wss: %s" % hostAddress) + region = matched.group(1) signedURL = self._sigV4Handler.createWebsocketEndpoint(hostAddress, portNumber, region, "GET", "iotdata", "/mqtt") - if signedURL == "": - raise wssNoKeyInEnvironmentError() # Now we got a signedURL path = signedURL[signedURL.index("/mqtt"):] # Assemble HTTP request headers @@ -667,6 +689,9 @@ def close(self): self._sslSocket.close() self._sslSocket = None + def getpeercert(self): + return self._sslSocket.getpeercert() + def getSSLSocket(self): if self._connectStatus != self._WebsocketDisconnected: return self._sslSocket diff --git a/AWSIoTPythonSDK/core/protocol/internal/clients.py b/AWSIoTPythonSDK/core/protocol/internal/clients.py index ae6d764..7a6552b 100644 --- a/AWSIoTPythonSDK/core/protocol/internal/clients.py +++ b/AWSIoTPythonSDK/core/protocol/internal/clients.py @@ -94,6 +94,9 @@ def set_endpoint_provider(self, endpoint_provider): def configure_last_will(self, topic, payload, qos, retain=False): self._paho_client.will_set(topic, payload, qos, retain) + def configure_alpn_protocols(self, alpn_protocols): + self._paho_client.config_alpn_protocols(alpn_protocols) + def clear_last_will(self): self._paho_client.will_clear() diff --git a/AWSIoTPythonSDK/core/protocol/internal/defaults.py b/AWSIoTPythonSDK/core/protocol/internal/defaults.py index 2ec1884..66817d3 100644 --- a/AWSIoTPythonSDK/core/protocol/internal/defaults.py +++ b/AWSIoTPythonSDK/core/protocol/internal/defaults.py @@ -16,4 +16,5 @@ DEFAULT_CONNECT_DISCONNECT_TIMEOUT_SEC = 30 DEFAULT_OPERATION_TIMEOUT_SEC = 5 DEFAULT_DRAINING_INTERNAL_SEC = 0.5 -METRICS_PREFIX = "?SDK=Python&Version=" \ No newline at end of file +METRICS_PREFIX = "?SDK=Python&Version=" +ALPN_PROTCOLS = "x-amzn-mqtt-ca" \ No newline at end of file diff --git a/AWSIoTPythonSDK/core/protocol/mqtt_core.py b/AWSIoTPythonSDK/core/protocol/mqtt_core.py index 05f1e19..215cfc7 100644 --- a/AWSIoTPythonSDK/core/protocol/mqtt_core.py +++ b/AWSIoTPythonSDK/core/protocol/mqtt_core.py @@ -26,6 +26,7 @@ from AWSIoTPythonSDK.core.protocol.internal.defaults import DEFAULT_CONNECT_DISCONNECT_TIMEOUT_SEC from AWSIoTPythonSDK.core.protocol.internal.defaults import DEFAULT_OPERATION_TIMEOUT_SEC from AWSIoTPythonSDK.core.protocol.internal.defaults import METRICS_PREFIX +from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError @@ -62,6 +63,7 @@ class MqttCore(object): _logger = logging.getLogger(__name__) def __init__(self, client_id, clean_session, protocol, use_wss): + self._use_wss = use_wss self._username = "" self._password = None self._enable_metrics_collection = True @@ -110,6 +112,9 @@ def _init_workers(self): def _start_workers(self): self._event_consumer.start() + def use_wss(self): + return self._use_wss + # Used for general message event reception def on_message(self, message): pass @@ -149,6 +154,10 @@ def configure_reconnect_back_off(self, base_reconnect_quiet_sec, max_reconnect_q self._logger.info("Stable connection time: %f sec" % stable_connection_sec) self._internal_async_client.configure_reconnect_back_off(base_reconnect_quiet_sec, max_reconnect_quiet_sec, stable_connection_sec) + def configure_alpn_protocols(self): + self._logger.info("Configuring alpn protocols...") + self._internal_async_client.configure_alpn_protocols([ALPN_PROTCOLS]) + def configure_last_will(self, topic, payload, qos, retain=False): self._logger.info("Configuring last will...") self._internal_async_client.configure_last_will(topic, payload, qos, retain) @@ -192,11 +201,23 @@ def connect_async(self, keep_alive_sec, ack_callback=None): self._start_workers() self._load_callbacks() self._load_username_password() - self._client_status.set_status(ClientStatus.CONNECT) - rc = self._internal_async_client.connect(keep_alive_sec, ack_callback) - if MQTT_ERR_SUCCESS != rc: - self._logger.error("Connect error: %d", rc) - raise connectError(rc) + + try: + self._client_status.set_status(ClientStatus.CONNECT) + rc = self._internal_async_client.connect(keep_alive_sec, ack_callback) + if MQTT_ERR_SUCCESS != rc: + self._logger.error("Connect error: %d", rc) + raise connectError(rc) + except Exception as e: + # Provided any error in connect, we should clean up the threads that have been created + self._event_consumer.stop() + if not self._event_consumer.wait_until_it_stops(self._connect_disconnect_timeout_sec): + self._logger.error("Time out in waiting for event consumer to stop") + else: + self._logger.debug("Event consumer stopped") + self._client_status.set_status(ClientStatus.IDLE) + raise e + return FixedEventMids.CONNACK_MID def _load_callbacks(self): diff --git a/AWSIoTPythonSDK/core/protocol/paho/client.py b/AWSIoTPythonSDK/core/protocol/paho/client.py index 9787984..614e4bf 100755 --- a/AWSIoTPythonSDK/core/protocol/paho/client.py +++ b/AWSIoTPythonSDK/core/protocol/paho/client.py @@ -47,6 +47,7 @@ from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore from AWSIoTPythonSDK.core.protocol.connection.cores import SecuredWebSocketCore +from AWSIoTPythonSDK.core.protocol.connection.alpn import SSLContextBuilder VERSION_MAJOR=1 VERSION_MINOR=0 @@ -506,6 +507,7 @@ def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQT self._AWSAccessKeyIDCustomConfig = "" self._AWSSecretAccessKeyCustomConfig = "" self._AWSSessionTokenCustomConfig = "" + self._alpn_protocols = None def __del__(self): pass @@ -532,6 +534,14 @@ def configIAMCredentials(self, srcAWSAccessKeyID, srcAWSSecretAccessKey, srcAWSS self._AWSSecretAccessKeyCustomConfig = srcAWSSecretAccessKey self._AWSSessionTokenCustomConfig = srcAWSSessionToken + def config_alpn_protocols(self, alpn_protocols): + """ + Make custom settings for ALPN protocols + :param alpn_protocols: Array of strings that specifies the alpn protocols to be used + :return: None + """ + self._alpn_protocols = alpn_protocols + def reinitialise(self, client_id="", clean_session=True, userdata=None): if self._ssl: self._ssl.close() @@ -778,14 +788,31 @@ def reconnect(self): if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN: raise + verify_hostname = self._tls_insecure is False # Decide whether we need to verify hostname + if self._tls_ca_certs is not None: if self._useSecuredWebsocket: # Never assign to ._ssl before wss handshake is finished # Non-None value for ._ssl will allow ops before wss-MQTT connection is established rawSSL = ssl.wrap_socket(sock, ca_certs=self._tls_ca_certs, cert_reqs=ssl.CERT_REQUIRED) # Add server certificate verification rawSSL.setblocking(0) # Non-blocking socket - self._ssl = SecuredWebSocketCore(rawSSL, self._host, self._port, self._AWSAccessKeyIDCustomConfig, self._AWSSecretAccessKeyCustomConfig, self._AWSSessionTokenCustomConfig) # Overeride the _ssl socket + self._ssl = SecuredWebSocketCore(rawSSL, self._host, self._port, self._AWSAccessKeyIDCustomConfig, self._AWSSecretAccessKeyCustomConfig, self._AWSSessionTokenCustomConfig) # Override the _ssl socket # self._ssl.enableDebug() + elif self._alpn_protocols is not None: + # SSLContext is required to enable ALPN support + # Assuming Python 2.7.10+/3.5+ till the end of this elif branch + ssl_context = SSLContextBuilder()\ + .with_protocol(self._tls_version)\ + .with_ca_certs(self._tls_ca_certs)\ + .with_cert_key_pair(self._tls_certfile, self._tls_keyfile)\ + .with_cert_reqs(self._tls_cert_reqs)\ + .with_check_hostname(True)\ + .with_ciphers(self._tls_ciphers)\ + .with_alpn_protocols(self._alpn_protocols)\ + .build() + self._ssl = ssl_context.wrap_socket(sock, server_hostname=self._host, do_handshake_on_connect=False) + verify_hostname = False # Since check_hostname in SSLContext is already set to True, no need to verify it again + self._ssl.do_handshake() else: self._ssl = ssl.wrap_socket( sock, @@ -796,11 +823,11 @@ def reconnect(self): ssl_version=self._tls_version, ciphers=self._tls_ciphers) - if self._tls_insecure is False: - if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 5): # No IP host match before 3.5.x - self._tls_match_hostname() - else: - ssl.match_hostname(self._ssl.getpeercert(), self._host) + if verify_hostname: + if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 5): # No IP host match before 3.5.x + self._tls_match_hostname() + else: + ssl.match_hostname(self._ssl.getpeercert(), self._host) self._sock = sock diff --git a/AWSIoTPythonSDK/exception/AWSIoTExceptions.py b/AWSIoTPythonSDK/exception/AWSIoTExceptions.py index 3bfc6e2..0de5401 100755 --- a/AWSIoTPythonSDK/exception/AWSIoTExceptions.py +++ b/AWSIoTPythonSDK/exception/AWSIoTExceptions.py @@ -145,3 +145,9 @@ def __init__(self): class DiscoveryFailure(operationError.operationError): def __init__(self, message): self.message = message + + +# Client Error +class ClientError(Exception): + def __init__(self, message): + self.message = message diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a1ddd5a..7b316a5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,17 @@ CHANGELOG ========= +1.4.0 +===== +* bugfix:Issue `#136 ` +* bugfix:Issue:`#124 ` +* improvement:Expose the missing getpeercert() from SecuredWebsocket class +* improvement:Enforce sending host header in the outbound discovery request +* improvement:Ensure credentials non error are properly handled and communicated to application level when creating wss endpoint +* feature:Add support for ALPN, along with API docs, sample and updated README +* feature:Add support for IoT Jobs, along with API docs, sample and updated README +* feature:Add command line option to allow port number override + 1.3.1 ===== * bugfix:Issue:`#67 `__ diff --git a/README.rst b/README.rst index 0f6d258..6b65ef3 100755 --- a/README.rst +++ b/README.rst @@ -40,8 +40,9 @@ IoT: - MQTT (over TLS 1.2) with X.509 certificate-based mutual authentication. - MQTT over the WebSocket protocol with AWS Signature Version 4 authentication. +- MQTT (over TLS 1.2) with X.509 certificate-based mutual authentication with TLS ALPN extension. -For MQTT over TLS (port 8883), a valid certificate and a private key are +For MQTT over TLS (port 8883 and port 443), a valid certificate and a private key are required for authentication. For MQTT over the WebSocket protocol (port 443), a valid AWS Identity and Access Management (IAM) access key ID and secret access key pair are required for authentication. @@ -66,7 +67,9 @@ Installation Minimum Requirements ____________________ -- Python 2.7+ or Python 3.3+ +- Python 2.7+ or Python 3.3+ for X.509 certificate-based mutual authentication via port 8883 + and MQTT over WebSocket protocol with AWS Signature Version 4 authentication +- Python 2.7.10+ or Python 3.5+ for X.509 certificate-based mutual authentication via port 443 - OpenSSL version 1.0.1+ (TLS version 1.2) compiled with the Python executable for X.509 certificate-based mutual authentication @@ -239,6 +242,8 @@ You can initialize and configure the client like this: myMQTTClient.configureEndpoint("YOUR.ENDPOINT", 8883) # For Websocket # myMQTTClient.configureEndpoint("YOUR.ENDPOINT", 443) + # For TLS mutual authentication with TLS ALPN extension + # myMQTTClient.configureEndpoint("YOUR.ENDPOINT", 443) myMQTTClient.configureCredentials("YOUR/ROOT/CA/PATH", "PRIVATE/KEY/PATH", "CERTIFICATE/PATH") # For Websocket, we only need to configure the root CA # myMQTTClient.configureCredentials("YOUR/ROOT/CA/PATH") @@ -279,6 +284,8 @@ You can initialize and configure the client like this: myShadowClient.configureEndpoint("YOUR.ENDPOINT", 8883) # For Websocket # myShadowClient.configureEndpoint("YOUR.ENDPOINT", 443) + # For TLS mutual authentication with TLS ALPN extension + # myShadowClient.configureEndpoint("YOUR.ENDPOINT", 443) myShadowClient.configureCredentials("YOUR/ROOT/CA/PATH", "PRIVATE/KEY/PATH", "CERTIFICATE/PATH") # For Websocket, we only need to configure the root CA # myShadowClient.configureCredentials("YOUR/ROOT/CA/PATH") @@ -310,6 +317,57 @@ MQTT operations along with shadow operations: myMQTTClient = myShadowClient.getMQTTConnection() myMQTTClient.publish("plainMQTTTopic", "Payload", 1) +AWSIoTMQTTThingJobsClient +__________________ + +This is the client class used for jobs operations with AWS IoT. See docs here: +https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html +You can initialize and configure the client like this: + +.. code-block:: python + + from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTThingJobsClient + + # For certificate based connection + myJobsClient = AWSIoTMQTTThingJobsClient("myClientID", "myThingName") + # For Websocket connection + # myJobsClient = AWSIoTMQTTThingJobsClient("myClientID", "myThingName", useWebsocket=True) + # Configurations + # For TLS mutual authentication + myJobsClient.configureEndpoint("YOUR.ENDPOINT", 8883) + # For Websocket + # myJobsClient.configureEndpoint("YOUR.ENDPOINT", 443) + myJobsClient.configureCredentials("YOUR/ROOT/CA/PATH", "PRIVATE/KEY/PATH", "CERTIFICATE/PATH") + # For Websocket, we only need to configure the root CA + # myJobsClient.configureCredentials("YOUR/ROOT/CA/PATH") + myJobsClient.configureConnectDisconnectTimeout(10) # 10 sec + myJobsClient.configureMQTTOperationTimeout(5) # 5 sec + ... + +For job operations, your script will look like this: + +.. code-block:: python + + ... + myJobsClient.connect() + # Create a subsciption for $notify-next topic + myJobsClient.createJobSubscription(notifyNextCallback, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) + # Create a subscription for update-job-execution accepted response topic + myJobsClient.createJobSubscription(updateSuccessfulCallback, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+') + # Send a message to start the next pending job (if any) + myJobsClient.sendJobsStartNext(statusDetailsDict) + # Send a message to update a successfully completed job + myJobsClient.sendJobsUpdate(jobId, jobExecutionStatus.JOB_EXECUTION_SUCCEEDED, statusDetailsDict) + ... + +You can also retrieve the MQTTClient(MQTT connection) to perform plain +MQTT operations along with shadow operations: + +.. code-block:: python + + myMQTTClient = myJobsClient.getMQTTConnection() + myMQTTClient.publish("plainMQTTTopic", "Payload", 1) + DiscoveryInfoProvider _____________________ @@ -600,6 +658,8 @@ Run the example like this: python basicPubSub.py -e -r -c -k -id -t # Customize the message python basicPubSub.py -e -r -c -k -id -t -M + # Customize the port number + python basicPubSub.py -e -r -c -k -p # change the run mode to subscribe or publish only (see python basicPubSub.py -h for the available options) python basicPubSub.py -e -r -c -k -m @@ -666,6 +726,8 @@ Run the example like this: python basicPubSubAsync.py -e -r -w # Customize client id and topic python basicPubSubAsync.py -e -r -c -k -id -t + # Customize the port number + python basicPubSubAsync.py -e -r -c -k -p Source ****** @@ -697,6 +759,8 @@ Run the example like this: python basicPubSub_APICallInCallback.py -e -r -w # Customize client id and topic python basicPubSub_APICallInCallback.py -e -r -c -k -id -t + # Customize the port number + python basicPubSub_APICallInCallback.py -e -r -c -k -p Source ****** @@ -735,6 +799,8 @@ First, start the basicShadowDeltaListener: python basicShadowDeltaListener.py -e -r -c -k # MQTT over WebSocket python basicShadowDeltaListener.py -e -r -w + # Customize the port number + python basicShadowDeltaListener.py -e -r -c -k -p Then, start the basicShadowUpdater: @@ -745,6 +811,8 @@ Then, start the basicShadowUpdater: python basicShadowUpdater.py -e -r -c -k # MQTT over WebSocket python basicShadowUpdater.py -e -r -w + # Customize the port number + python basicShadowUpdater.py -e -r -c -k -p After the basicShadowUpdater starts sending shadow update requests, you @@ -780,6 +848,8 @@ Run the example like this: python ThingShadowEcho.py -e -r -w # Customize client Id and thing name python ThingShadowEcho.py -e -r -c -k -id -n + # Customize the port number + python ThingShadowEcho.py -e -r -c -k -p Now use the `AWS IoT console `__ or other MQTT client to update the shadow desired state only. You should be able to see the reported state is updated to match @@ -790,6 +860,42 @@ Source The example is available in ``samples/ThingShadowEcho/``. +JobsSample +__________ + +This example demonstrates how a device communicates with AWS IoT while +also taking advantage of AWS IoT Jobs functionality. It shows how to +subscribe to Jobs topics in order to recieve Job documents on your +device. It also shows how to process those Jobs so that you can see in +the `AWS IoT console `__ which of your devices have received and processed +which Jobs. See the AWS IoT Device Management documentation `here `__ +for more information on creating and deploying Jobs to your fleet of +devices to facilitate management tasks such deploying software updates +and running diagnostics. + +Instructions +************ + +First use the `AWS IoT console `__ to create and deploy Jobs to your fleet of devices. + +Then run the example like this: + +.. code-block:: python + + # Certificate based mutual authentication + python jobsSample.py -e -r -c -k -n + # MQTT over WebSocket + python jobsSample.py -e -r -w -n + # Customize client Id and thing name + python jobsSample.py -e -r -c -k -id -n + # Customize the port number + python jobsSample.py -e -r -c -k -n -p + +Source +****** + +The example is available in ``samples/jobs/``. + BasicDiscovery ______________ diff --git a/samples/ThingShadowEcho/ThingShadowEcho.py b/samples/ThingShadowEcho/ThingShadowEcho.py index ba229b3..cd1f17e 100755 --- a/samples/ThingShadowEcho/ThingShadowEcho.py +++ b/samples/ThingShadowEcho/ThingShadowEcho.py @@ -46,6 +46,7 @@ def customShadowCallback_Delta(self, payload, responseStatus, token): parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, help="Use MQTT over WebSocket") parser.add_argument("-n", "--thingName", action="store", dest="thingName", default="Bot", help="Targeted thing name") @@ -57,6 +58,7 @@ def customShadowCallback_Delta(self, payload, responseStatus, token): rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath +port = args.port useWebsocket = args.useWebsocket thingName = args.thingName clientId = args.clientId @@ -69,6 +71,12 @@ def customShadowCallback_Delta(self, payload, responseStatus, token): parser.error("Missing credentials for authentication.") exit(2) +# Port defaults +if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 + port = 443 +if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 + port = 8883 + # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) @@ -81,11 +89,11 @@ def customShadowCallback_Delta(self, payload, responseStatus, token): myAWSIoTMQTTShadowClient = None if useWebsocket: myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId, useWebsocket=True) - myAWSIoTMQTTShadowClient.configureEndpoint(host, 443) + myAWSIoTMQTTShadowClient.configureEndpoint(host, port) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId) - myAWSIoTMQTTShadowClient.configureEndpoint(host, 8883) + myAWSIoTMQTTShadowClient.configureEndpoint(host, port) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTShadowClient configuration diff --git a/samples/basicPubSub/basicPubSub.py b/samples/basicPubSub/basicPubSub.py index 14d9315..dc823fc 100755 --- a/samples/basicPubSub/basicPubSub.py +++ b/samples/basicPubSub/basicPubSub.py @@ -38,6 +38,7 @@ def customCallback(client, userdata, message): parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, help="Use MQTT over WebSocket") parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub", @@ -53,6 +54,7 @@ def customCallback(client, userdata, message): rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath +port = args.port useWebsocket = args.useWebsocket clientId = args.clientId topic = args.topic @@ -69,6 +71,12 @@ def customCallback(client, userdata, message): parser.error("Missing credentials for authentication.") exit(2) +# Port defaults +if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 + port = 443 +if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 + port = 8883 + # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) @@ -81,11 +89,11 @@ def customCallback(client, userdata, message): myAWSIoTMQTTClient = None if useWebsocket: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) - myAWSIoTMQTTClient.configureEndpoint(host, 443) + myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) - myAWSIoTMQTTClient.configureEndpoint(host, 8883) + myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTClient connection configuration diff --git a/samples/basicPubSub/basicPubSubAsync.py b/samples/basicPubSub/basicPubSubAsync.py index d8f6ded..25050f4 100644 --- a/samples/basicPubSub/basicPubSubAsync.py +++ b/samples/basicPubSub/basicPubSubAsync.py @@ -52,6 +52,7 @@ def customPubackCallback(mid): parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, help="Use MQTT over WebSocket") parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub", @@ -63,6 +64,7 @@ def customPubackCallback(mid): rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath +port = args.port useWebsocket = args.useWebsocket clientId = args.clientId topic = args.topic @@ -75,6 +77,12 @@ def customPubackCallback(mid): parser.error("Missing credentials for authentication.") exit(2) +# Port defaults +if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 + port = 443 +if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 + port = 8883 + # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) @@ -87,11 +95,11 @@ def customPubackCallback(mid): myAWSIoTMQTTClient = None if useWebsocket: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) - myAWSIoTMQTTClient.configureEndpoint(host, 443) + myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) - myAWSIoTMQTTClient.configureEndpoint(host, 8883) + myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTClient connection configuration diff --git a/samples/basicPubSub/basicPubSub_APICallInCallback.py b/samples/basicPubSub/basicPubSub_APICallInCallback.py index 75581e1..710457e 100644 --- a/samples/basicPubSub/basicPubSub_APICallInCallback.py +++ b/samples/basicPubSub/basicPubSub_APICallInCallback.py @@ -58,6 +58,7 @@ def subackCallback(self, mid, data): parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, help="Use MQTT over WebSocket") parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub", @@ -69,6 +70,7 @@ def subackCallback(self, mid, data): rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath +port = args.port useWebsocket = args.useWebsocket clientId = args.clientId topic = args.topic @@ -81,6 +83,12 @@ def subackCallback(self, mid, data): parser.error("Missing credentials for authentication.") exit(2) +# Port defaults +if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 + port = 443 +if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 + port = 8883 + # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) @@ -93,11 +101,11 @@ def subackCallback(self, mid, data): myAWSIoTMQTTClient = None if useWebsocket: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) - myAWSIoTMQTTClient.configureEndpoint(host, 443) + myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) - myAWSIoTMQTTClient.configureEndpoint(host, 8883) + myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTClient connection configuration diff --git a/samples/basicShadow/basicShadowDeltaListener.py b/samples/basicShadow/basicShadowDeltaListener.py index b1b7a44..73a6b7b 100755 --- a/samples/basicShadow/basicShadowDeltaListener.py +++ b/samples/basicShadow/basicShadowDeltaListener.py @@ -51,6 +51,7 @@ def customShadowCallback_Delta(payload, responseStatus, token): parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, help="Use MQTT over WebSocket") parser.add_argument("-n", "--thingName", action="store", dest="thingName", default="Bot", help="Targeted thing name") @@ -62,6 +63,7 @@ def customShadowCallback_Delta(payload, responseStatus, token): rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath +port = args.port useWebsocket = args.useWebsocket thingName = args.thingName clientId = args.clientId @@ -74,6 +76,12 @@ def customShadowCallback_Delta(payload, responseStatus, token): parser.error("Missing credentials for authentication.") exit(2) +# Port defaults +if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 + port = 443 +if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 + port = 8883 + # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) @@ -86,11 +94,11 @@ def customShadowCallback_Delta(payload, responseStatus, token): myAWSIoTMQTTShadowClient = None if useWebsocket: myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId, useWebsocket=True) - myAWSIoTMQTTShadowClient.configureEndpoint(host, 443) + myAWSIoTMQTTShadowClient.configureEndpoint(host, port) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId) - myAWSIoTMQTTShadowClient.configureEndpoint(host, 8883) + myAWSIoTMQTTShadowClient.configureEndpoint(host, port) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTShadowClient configuration diff --git a/samples/basicShadow/basicShadowUpdater.py b/samples/basicShadow/basicShadowUpdater.py index c2e202c..2f9b9e2 100755 --- a/samples/basicShadow/basicShadowUpdater.py +++ b/samples/basicShadow/basicShadowUpdater.py @@ -64,6 +64,7 @@ def customShadowCallback_Delete(payload, responseStatus, token): parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, help="Use MQTT over WebSocket") parser.add_argument("-n", "--thingName", action="store", dest="thingName", default="Bot", help="Targeted thing name") @@ -74,6 +75,7 @@ def customShadowCallback_Delete(payload, responseStatus, token): rootCAPath = args.rootCAPath certificatePath = args.certificatePath privateKeyPath = args.privateKeyPath +port = args.port useWebsocket = args.useWebsocket thingName = args.thingName clientId = args.clientId @@ -86,6 +88,12 @@ def customShadowCallback_Delete(payload, responseStatus, token): parser.error("Missing credentials for authentication.") exit(2) +# Port defaults +if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 + port = 443 +if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 + port = 8883 + # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) @@ -98,11 +106,11 @@ def customShadowCallback_Delete(payload, responseStatus, token): myAWSIoTMQTTShadowClient = None if useWebsocket: myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId, useWebsocket=True) - myAWSIoTMQTTShadowClient.configureEndpoint(host, 443) + myAWSIoTMQTTShadowClient.configureEndpoint(host, port) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId) - myAWSIoTMQTTShadowClient.configureEndpoint(host, 8883) + myAWSIoTMQTTShadowClient.configureEndpoint(host, port) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTShadowClient configuration diff --git a/samples/jobs/jobsSample.py b/samples/jobs/jobsSample.py new file mode 100644 index 0000000..7cbd27e --- /dev/null +++ b/samples/jobs/jobsSample.py @@ -0,0 +1,178 @@ +''' +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + ''' + +from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient +from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTThingJobsClient +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType +from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus + +import threading +import logging +import time +import datetime +import argparse +import json + +class JobsMessageProcessor(object): + def __init__(self, awsIoTMQTTThingJobsClient, clientToken): + #keep track of this to correlate request/responses + self.clientToken = clientToken + self.awsIoTMQTTThingJobsClient = awsIoTMQTTThingJobsClient + self.done = False + self.jobsStarted = 0 + self.jobsSucceeded = 0 + self.jobsRejected = 0 + self._setupCallbacks(self.awsIoTMQTTThingJobsClient) + + def _setupCallbacks(self, awsIoTMQTTThingJobsClient): + self.awsIoTMQTTThingJobsClient.createJobSubscription(self.newJobReceived, jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) + self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextJobSuccessfullyInProgress, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE) + self.awsIoTMQTTThingJobsClient.createJobSubscription(self.startNextRejected, jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE) + + # '+' indicates a wildcard for jobId in the following subscriptions + self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobSuccessful, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE, '+') + self.awsIoTMQTTThingJobsClient.createJobSubscription(self.updateJobRejected, jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE, '+') + + #call back on successful job updates + def startNextJobSuccessfullyInProgress(self, client, userdata, message): + payload = json.loads(message.payload.decode('utf-8')) + if 'execution' in payload: + self.jobsStarted += 1 + execution = payload['execution'] + self.executeJob(execution) + statusDetails = {'HandledBy': 'ClientToken: {}'.format(self.clientToken)} + threading.Thread(target = self.awsIoTMQTTThingJobsClient.sendJobsUpdate, kwargs = {'jobId': execution['jobId'], 'status': jobExecutionStatus.JOB_EXECUTION_SUCCEEDED, 'statusDetails': statusDetails, 'expectedVersion': execution['versionNumber'], 'executionNumber': execution['executionNumber']}).start() + else: + print('Start next saw no execution: ' + message.payload.decode('utf-8')) + self.done = True + + def executeJob(self, execution): + print('Executing job ID, version, number: {}, {}, {}'.format(execution['jobId'], execution['versionNumber'], execution['executionNumber'])) + print('With jobDocument: ' + json.dumps(execution['jobDocument'])) + + def newJobReceived(self, client, userdata, message): + payload = json.loads(message.payload.decode('utf-8')) + if 'execution' in payload: + self._attemptStartNextJob() + else: + print('Notify next saw no execution') + self.done = True + + def processJobs(self): + self.done = False + self._attemptStartNextJob() + + def startNextRejected(self, client, userdata, message): + printf('Start next rejected:' + message.payload.decode('utf-8')) + self.jobsRejected += 1 + + def updateJobSuccessful(self, client, userdata, message): + self.jobsSucceeded += 1 + + def updateJobRejected(self, client, userdata, message): + self.jobsRejected += 1 + + def _attemptStartNextJob(self): + statusDetails = {'StartedBy': 'ClientToken: {} on {}'.format(self.clientToken, datetime.datetime.now().isoformat())} + threading.Thread(target=self.awsIoTMQTTThingJobsClient.sendJobsStartNext, kwargs = {'statusDetails': statusDetails}).start() + + def isDone(self): + return self.done + + def getStats(self): + stats = {} + stats['jobsStarted'] = self.jobsStarted + stats['jobsSucceeded'] = self.jobsSucceeded + stats['jobsRejected'] = self.jobsRejected + return stats + +# Read in command-line parameters +parser = argparse.ArgumentParser() +parser.add_argument("-n", "--thingName", action="store", dest="thingName", help="Your AWS IoT ThingName to process jobs for") +parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint") +parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") +parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") +parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override") +parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, + help="Use MQTT over WebSocket") +parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicJobsSampleClient", + help="Targeted client id") + +args = parser.parse_args() +host = args.host +rootCAPath = args.rootCAPath +certificatePath = args.certificatePath +privateKeyPath = args.privateKeyPath +port = args.port +useWebsocket = args.useWebsocket +clientId = args.clientId +thingName = args.thingName + +if args.useWebsocket and args.certificatePath and args.privateKeyPath: + parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.") + exit(2) + +if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath): + parser.error("Missing credentials for authentication.") + exit(2) + +# Port defaults +if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443 + port = 443 +if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883 + port = 8883 + +# Configure logging +logger = logging.getLogger("AWSIoTPythonSDK.core") +logger.setLevel(logging.DEBUG) +streamHandler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +streamHandler.setFormatter(formatter) +logger.addHandler(streamHandler) + +# Init AWSIoTMQTTClient +myAWSIoTMQTTClient = None +if useWebsocket: + myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) + myAWSIoTMQTTClient.configureEndpoint(host, port) + myAWSIoTMQTTClient.configureCredentials(rootCAPath) +else: + myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) + myAWSIoTMQTTClient.configureEndpoint(host, port) + myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) + +# AWSIoTMQTTClient connection configuration +myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) +myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec +myAWSIoTMQTTClient.configureMQTTOperationTimeout(10) # 5 sec + +jobsClient = AWSIoTMQTTThingJobsClient(clientId, thingName, QoS=1, awsIoTMQTTClient=myAWSIoTMQTTClient) + +print('Connecting to MQTT server and setting up callbacks...') +jobsClient.connect() +jobsMsgProc = JobsMessageProcessor(jobsClient, clientId) +print('Starting to process jobs...') +jobsMsgProc.processJobs() +while not jobsMsgProc.isDone(): + time.sleep(2) + +print('Done processing jobs') +print('Stats: ' + json.dumps(jobsMsgProc.getStats())) + +jobsClient.disconnect() diff --git a/setup.py b/setup.py index e149243..3846bae 100644 --- a/setup.py +++ b/setup.py @@ -8,6 +8,7 @@ name = 'AWSIoTPythonSDK', packages=['AWSIoTPythonSDK', 'AWSIoTPythonSDK.core', 'AWSIoTPythonSDK.core.util', 'AWSIoTPythonSDK.core.shadow', 'AWSIoTPythonSDK.core.protocol', + 'AWSIoTPythonSDK.core.jobs', 'AWSIoTPythonSDK.core.protocol.paho', 'AWSIoTPythonSDK.core.protocol.internal', 'AWSIoTPythonSDK.core.protocol.connection', 'AWSIoTPythonSDK.core.greengrass', 'AWSIoTPythonSDK.core.greengrass.discovery', 'AWSIoTPythonSDK.exception'],