From f05037ea779a17f8779b09724a5597676879a6b0 Mon Sep 17 00:00:00 2001 From: Oliva Kar Date: Wed, 10 Jan 2024 11:50:36 -0800 Subject: [PATCH 1/6] one pass --- .../azure/iot/device/custom_typing.py | 41 +++++++ .../iot/device/iothub/abstract_clients.py | 110 +++++++++--------- .../iot/device/iothub/aio/async_clients.py | 51 ++++---- .../azure/iot/device/iothub/models/message.py | 4 +- .../azure/iot/device/iothub/models/methods.py | 15 ++- .../azure/iot/device/iothub/sync_clients.py | 50 ++++---- .../abstract_provisioning_device_client.py | 32 ++--- .../aio/async_provisioning_device_client.py | 8 +- .../models/registration_result.py | 94 +++++++-------- .../provisioning_device_client.py | 8 +- 10 files changed, 240 insertions(+), 173 deletions(-) create mode 100644 azure-iot-device/azure/iot/device/custom_typing.py diff --git a/azure-iot-device/azure/iot/device/custom_typing.py b/azure-iot-device/azure/iot/device/custom_typing.py new file mode 100644 index 000000000..2c1508fd4 --- /dev/null +++ b/azure-iot-device/azure/iot/device/custom_typing.py @@ -0,0 +1,41 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +from typing import Any, Union, Dict, List, Tuple, Callable, Awaitable, TypeVar +from typing_extensions import TypedDict, ParamSpec + + +_P = ParamSpec("_P") +_R = TypeVar("_R") +FunctionOrCoroutine = Union[Callable[_P, _R], Callable[_P, Awaitable[_R]]] + + +# typing does not support recursion, so we must use forward references here (PEP484) +JSONSerializable = Union[ + Dict[str, "JSONSerializable"], + List["JSONSerializable"], + Tuple["JSONSerializable", ...], + str, + int, + float, + bool, + None, +] +# TODO: verify that the JSON specification requires str as keys in dict. Not sure why that's defined here. + + +Twin = Dict[str, Dict[str, JSONSerializable]] +TwinPatch = Dict[str, JSONSerializable] + + +class StorageInfo(TypedDict): + correlationId: str + hostName: str + containerName: str + blobName: str + sasToken: str + + +ProvisioningPayload = Union[Dict[str, Any], str, int] diff --git a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py index ed712e298..51c4abad5 100644 --- a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py @@ -17,14 +17,20 @@ from azure.iot.device.common.auth import connection_string as cs from azure.iot.device.common.auth import sastoken as st from azure.iot.device.iothub import client_event +from azure.iot.device.iothub.models import Message, MethodRequest +from azure.iot.device.common.models import X509 from azure.iot.device import exceptions from azure.iot.device.common import auth, handle_exceptions from . import edge_hsm +from .pipeline import MQTTPipeline, HTTPPipeline +from typing_extensions import Self +from azure.iot.device.custom_typing import JSONSerializable, Twin, TwinPatch +from typing import Any, Callable, Dict, List, Optional, Union logger = logging.getLogger(__name__) -def _validate_kwargs(exclude=[], **kwargs): +def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs) -> None: """Helper function to validate user provided kwargs. Raises TypeError if an invalid option has been provided""" valid_kwargs = [ @@ -47,7 +53,7 @@ def _validate_kwargs(exclude=[], **kwargs): raise TypeError("Unsupported keyword argument: '{}'".format(kwarg)) -def _get_config_kwargs(**kwargs): +def _get_config_kwargs(**kwargs) -> Dict[str, Any]: """Get the subset of kwargs which pertain the config object""" valid_config_kwargs = [ "server_verification_cert", @@ -70,7 +76,7 @@ def _get_config_kwargs(**kwargs): return config_kwargs -def _form_sas_uri(hostname, device_id, module_id=None): +def _form_sas_uri(hostname: str, device_id: str, module_id: Optional[str] = None) -> str: if module_id: return "{hostname}/devices/{device_id}/modules/{module_id}".format( hostname=hostname, device_id=device_id, module_id=module_id @@ -79,7 +85,7 @@ def _form_sas_uri(hostname, device_id, module_id=None): return "{hostname}/devices/{device_id}".format(hostname=hostname, device_id=device_id) -def _extract_sas_uri_values(uri): +def _extract_sas_uri_values(uri: str) -> Dict[str, Any]: d = {} items = uri.split("/") if len(items) != 3 and len(items) != 5: @@ -108,7 +114,7 @@ class AbstractIoTHubClient(abc.ABC): This class needs to be extended for specific clients. """ - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline) -> None: """Initializer for a generic client. :param mqtt_pipeline: The pipeline used to connect to the IoTHub endpoint. @@ -122,7 +128,7 @@ def __init__(self, mqtt_pipeline, http_pipeline): self._receive_type = RECEIVE_TYPE_NONE_SET self._client_lock = threading.Lock() - def _on_connected(self): + def _on_connected(self) -> None: """Helper handler that is called upon an iothub pipeline connect""" logger.info("Connection State - Connected") client_event_inbox = self._inbox_manager.get_client_event_inbox() @@ -133,7 +139,7 @@ def _on_connected(self): # Ensure that all handlers are running now that connection is re-established. self._handler_manager.ensure_running() - def _on_disconnected(self): + def _on_disconnected(self) -> None: """Helper handler that is called upon an iothub pipeline disconnect""" logger.info("Connection State - Disconnected") client_event_inbox = self._inbox_manager.get_client_event_inbox() @@ -146,7 +152,7 @@ def _on_disconnected(self): self._inbox_manager.clear_all_method_requests() logger.info("Cleared all pending method requests due to disconnect") - def _on_new_sastoken_required(self): + def _on_new_sastoken_required(self) -> None: """Helper handler that is called upon the iothub pipeline needing new SAS token""" logger.info("New SasToken required from user") client_event_inbox = self._inbox_manager.get_client_event_inbox() @@ -155,7 +161,7 @@ def _on_new_sastoken_required(self): event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED) client_event_inbox.put(event) - def _on_background_exception(self, e): + def _on_background_exception(self, e: Exception) -> None: """Helper handler that is called upon an iothub pipeline background exception""" handle_exceptions.handle_background_exception(e) client_event_inbox = self._inbox_manager.get_client_event_inbox() @@ -164,7 +170,7 @@ def _on_background_exception(self, e): event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e) client_event_inbox.put(event) - def _check_receive_mode_is_api(self): + def _check_receive_mode_is_api(self) -> None: """Call this function first in EVERY receive API""" with self._client_lock: if self._receive_type is RECEIVE_TYPE_NONE_SET: @@ -177,7 +183,7 @@ def _check_receive_mode_is_api(self): else: pass - def _check_receive_mode_is_handler(self): + def _check_receive_mode_is_handler(self) -> None: """Call this function first in EVERY handler setter""" with self._client_lock: if self._receive_type is RECEIVE_TYPE_NONE_SET: @@ -192,7 +198,7 @@ def _check_receive_mode_is_handler(self): else: pass - def _replace_user_supplied_sastoken(self, sastoken_str): + def _replace_user_supplied_sastoken(self, sastoken_str: str) -> None: """ Replaces the pipeline's NonRenewableSasToken with a new one based on a provided sastoken string. Also does validation. @@ -232,12 +238,12 @@ def _replace_user_supplied_sastoken(self, sastoken_str): self._mqtt_pipeline.pipeline_configuration.sastoken = new_token_o @abc.abstractmethod - def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler): + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Optional[Callable[[], Any]]) -> None: # Will be implemented differently in child classes, but define here for static analysis pass @classmethod - def create_from_connection_string(cls, connection_string, **kwargs): + def create_from_connection_string(cls, connection_string: str, **kwargs: Dict[str, Any]) -> Self: """ Instantiate the client from a IoTHub device or module connection string. @@ -321,7 +327,7 @@ def create_from_connection_string(cls, connection_string, **kwargs): return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_sastoken(cls, sastoken, **kwargs): + def create_from_sastoken(cls, sastoken: str, **kwargs: Dict[str, Any]) -> Self: """Instantiate the client from a pre-created SAS Token string :param str sastoken: The SAS Token string @@ -393,54 +399,54 @@ def create_from_sastoken(cls, sastoken, **kwargs): return cls(mqtt_pipeline, http_pipeline) @abc.abstractmethod - def shutdown(self): + def shutdown(self) -> None: pass @abc.abstractmethod - def connect(self): + def connect(self) -> None: pass @abc.abstractmethod - def disconnect(self): + def disconnect(self) -> None: pass @abc.abstractmethod - def update_sastoken(self, sastoken): + def update_sastoken(self, sastoken: str) -> None: pass @abc.abstractmethod - def send_message(self, message): + def send_message(self, message: Union[Message, str]) -> None: pass @abc.abstractmethod - def receive_method_request(self, method_name=None): + def receive_method_request(self, method_name: Optional[str] = None) -> None: pass @abc.abstractmethod - def send_method_response(self, method_request, payload, status): + def send_method_response(self, method_request: MethodRequest, payload: Dict[str, JSONSerializable], status: int) -> None: pass @abc.abstractmethod - def get_twin(self): + def get_twin(self) -> Twin: pass @abc.abstractmethod - def patch_twin_reported_properties(self, reported_properties_patch): + def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None: pass @abc.abstractmethod - def receive_twin_desired_properties_patch(self): + def receive_twin_desired_properties_patch(self) -> TwinPatch: pass @property - def connected(self): + def connected(self) -> bool: """ Read-only property to indicate if the transport is connected or not. """ return self._mqtt_pipeline.connected @property - def on_connection_state_change(self): + def on_connection_state_change(self) -> Callable[[Any], Any]: """The handler function or coroutine that will be called when the connection state changes. The function or coroutine definition should take no positional arguments. @@ -448,11 +454,11 @@ def on_connection_state_change(self): return self._handler_manager.on_connection_state_change @on_connection_state_change.setter - def on_connection_state_change(self, value): + def on_connection_state_change(self, value: Callable[[Any], Any]) -> None: self._handler_manager.on_connection_state_change = value @property - def on_new_sastoken_required(self): + def on_new_sastoken_required(self) -> Callable[[Any], Any]: """The handler function or coroutine that will be called when the client requires a new SAS token. This will happen approximately 2 minutes before the SAS Token expires. On Windows platforms, if the lifespan exceeds approximately 49 days, a new token will @@ -469,11 +475,11 @@ def on_new_sastoken_required(self): return self._handler_manager.on_new_sastoken_required @on_new_sastoken_required.setter - def on_new_sastoken_required(self, value): + def on_new_sastoken_required(self, value: Callable[[Any], Any]) -> None: self._handler_manager.on_new_sastoken_required = value @property - def on_background_exception(self): + def on_background_exception(self) -> Callable[[Any], Any]: """The handler function or coroutine will be called when a background exception occurs. The function or coroutine definition should take one positional argument (the exception @@ -481,16 +487,16 @@ def on_background_exception(self): return self._handler_manager.on_background_exception @on_background_exception.setter - def on_background_exception(self, value): + def on_background_exception(self, value: Callable[[Any], Any]) -> None: self._handler_manager.on_background_exception = value @abc.abstractproperty - def on_message_received(self): + def on_message_received(self) -> Callable[[Any], Any]: # Defined below on AbstractIoTHubDeviceClient / AbstractIoTHubModuleClient pass @property - def on_method_request_received(self): + def on_method_request_received(self) -> Callable[[Any], Any]: """The handler function or coroutine that will be called when a method request is received. Remember to acknowledge the method request in your function or coroutine via use of the @@ -501,13 +507,13 @@ def on_method_request_received(self): return self._handler_manager.on_method_request_received @on_method_request_received.setter - def on_method_request_received(self, value): + def on_method_request_received(self, value: Callable[[Any], Any]) -> None: self._generic_receive_handler_setter( "on_method_request_received", pipeline_constant.METHODS, value ) @property - def on_twin_desired_properties_patch_received(self): + def on_twin_desired_properties_patch_received(self) -> Callable[[Any], Any]: """The handler function or coroutine that will be called when a twin desired properties patch is received. @@ -516,7 +522,7 @@ def on_twin_desired_properties_patch_received(self): return self._handler_manager.on_twin_desired_properties_patch_received @on_twin_desired_properties_patch_received.setter - def on_twin_desired_properties_patch_received(self, value): + def on_twin_desired_properties_patch_received(self, value: Callable[[Any], Any]): self._generic_receive_handler_setter( "on_twin_desired_properties_patch_received", pipeline_constant.TWIN_PATCHES, value ) @@ -524,7 +530,7 @@ def on_twin_desired_properties_patch_received(self, value): class AbstractIoTHubDeviceClient(AbstractIoTHubClient): @classmethod - def create_from_x509_certificate(cls, x509, hostname, device_id, **kwargs): + def create_from_x509_certificate(cls, x509: X509, hostname: str, device_id: str, **kwargs) -> Self: """ Instantiate a client using X509 certificate authentication. @@ -586,7 +592,7 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, **kwargs): return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_symmetric_key(cls, symmetric_key, hostname, device_id, **kwargs): + def create_from_symmetric_key(cls, symmetric_key: str, hostname: str, device_id: str, **kwargs) -> Self: """ Instantiate a client using symmetric key authentication. @@ -657,21 +663,21 @@ def create_from_symmetric_key(cls, symmetric_key, hostname, device_id, **kwargs) return cls(mqtt_pipeline, http_pipeline) @abc.abstractmethod - def receive_message(self): + def receive_message(self) -> Message: pass @abc.abstractmethod - def get_storage_info_for_blob(self, blob_name): + def get_storage_info_for_blob(self, blob_name: str) -> Dict[str, Any]: pass @abc.abstractmethod def notify_blob_upload_status( - self, correlation_id, is_success, status_code, status_description - ): + self, correlation_id: str, is_success: bool, status_code: int, status_description: str + ) -> None: pass @property - def on_message_received(self): + def on_message_received(self) -> Callable[[Any], Any]: """The handler function or coroutine that will be called when a message is received. The function or coroutine definition should take one positional argument (the @@ -679,7 +685,7 @@ def on_message_received(self): return self._handler_manager.on_message_received @on_message_received.setter - def on_message_received(self, value): + def on_message_received(self, value: Callable[[Any], Any]): self._generic_receive_handler_setter( "on_message_received", pipeline_constant.C2D_MSG, value ) @@ -687,7 +693,7 @@ def on_message_received(self, value): class AbstractIoTHubModuleClient(AbstractIoTHubClient): @classmethod - def create_from_edge_environment(cls, **kwargs): + def create_from_edge_environment(cls, **kwargs) -> Self: """ Instantiate the client from the IoT Edge environment. @@ -824,7 +830,7 @@ def create_from_edge_environment(cls, **kwargs): return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_x509_certificate(cls, x509, hostname, device_id, module_id, **kwargs): + def create_from_x509_certificate(cls, x509: X509, hostname: str, device_id: str, module_id: str, **kwargs) -> Self: """ Instantiate a client using X509 certificate authentication. @@ -885,19 +891,19 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, module_id, **kw return cls(mqtt_pipeline, http_pipeline) @abc.abstractmethod - def send_message_to_output(self, message, output_name): + def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None: pass @abc.abstractmethod - def receive_message_on_input(self, input_name): + def receive_message_on_input(self, input_name: str) -> Message: pass @abc.abstractmethod - def invoke_method(self, method_params, device_id, module_id=None): + def invoke_method(self, method_params: dict, device_id: str, module_id: Optional[str] = None) -> None: pass @property - def on_message_received(self): + def on_message_received(self) -> Callable[[Any], Any]: """The handler function or coroutine that will be called when an input message is received. The function definition or coroutine should take one positional argument (the @@ -905,7 +911,7 @@ def on_message_received(self): return self._handler_manager.on_message_received @on_message_received.setter - def on_message_received(self, value): + def on_message_received(self, value: Callable[[Any], Any]) -> None: self._generic_receive_handler_setter( "on_message_received", pipeline_constant.INPUT_MSG, value ) diff --git a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py index c57f3ec14..f66c8ff91 100644 --- a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py @@ -16,7 +16,7 @@ AbstractIoTHubDeviceClient, AbstractIoTHubModuleClient, ) -from azure.iot.device.iothub.models import Message +from azure.iot.device.iothub.models import Message, MethodRequest, MethodResponse from azure.iot.device.iothub.pipeline import constant from azure.iot.device.iothub.pipeline import exceptions as pipeline_exceptions from azure.iot.device import exceptions @@ -24,11 +24,14 @@ from .async_inbox import AsyncClientInbox from . import async_handler_manager, loop_management from azure.iot.device import constant as device_constant +from azure.iot.device.iothub.pipeline import MQTTPipeline, HTTPPipeline +from azure.iot.device.custom_typing import StorageInfo, Twin, TwinPatch +from typing import Any, Callable, Optional, Union logger = logging.getLogger(__name__) -async def handle_result(callback): +async def handle_result(callback: Callable[[], None]): try: return await callback.completion() except pipeline_exceptions.ConnectionDroppedError as e: @@ -91,7 +94,7 @@ def __init__(self, **kwargs): self._mqtt_pipeline.on_method_request_received = self._inbox_manager.route_method_request self._mqtt_pipeline.on_twin_patch_received = self._inbox_manager.route_twin_patch - async def _enable_feature(self, feature_name): + async def _enable_feature(self, feature_name: str) -> None: """Enable an Azure IoT Hub feature :param feature_name: The name of the feature to enable. @@ -111,7 +114,7 @@ async def _enable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already enabled - skipping".format(feature_name)) - async def _disable_feature(self, feature_name): + async def _disable_feature(self, feature_name: str) -> None: """Disable an Azure IoT Hub feature :param feature_name: The name of the feature to enable. @@ -131,7 +134,7 @@ async def _disable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler): + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Callable[[], Any]) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call (yes, even though this is the async client), meaning that this @@ -163,7 +166,7 @@ def _generic_receive_handler_setter(self, handler_name, feature_name, new_handle fut = asyncio.run_coroutine_threadsafe(self._disable_feature(feature_name), loop=loop) fut.result() - async def shutdown(self): + async def shutdown(self) -> None: """Shut down the client for graceful exit. Once this method is called, any attempts at further client calls will result in a @@ -207,7 +210,7 @@ async def shutdown(self): # capability for HTTP pipeline. logger.info("Client shutdown complete") - async def connect(self): + async def connect(self) -> None: """Connects the client to an Azure IoT Hub or Azure IoT Edge Hub instance. The destination is chosen based on the credentials passed via the auth_provider parameter @@ -232,7 +235,7 @@ async def connect(self): logger.info("Successfully connected to Hub") - async def disconnect(self): + async def disconnect(self) -> None: """Disconnect the client from the Azure IoT Hub or Azure IoT Edge Hub instance. It is recommended that you make sure to call this coroutine when you are completely done @@ -277,7 +280,7 @@ async def disconnect(self): logger.info("Successfully disconnected from Hub") - async def update_sastoken(self, sastoken): + async def update_sastoken(self, sastoken: str) -> None: """ Update the client's SAS Token used for authentication, then reauthorizes the connection. @@ -316,7 +319,7 @@ async def update_sastoken(self, sastoken): logger.info("Successfully reauthorized connection to Hub") - async def send_message(self, message): + async def send_message(self, message: Union[Message, str]) -> None: """Sends a message to the default events endpoint on the Azure IoT Hub or Azure IoT Edge Hub instance. If the connection to the service has not previously been opened by a call to connect, this @@ -360,7 +363,7 @@ async def send_message(self, message): current_version=device_constant.VERSION, details="We recommend that you use the .on_method_request_received property to set a handler instead", ) - async def receive_method_request(self, method_name=None): + async def receive_method_request(self, method_name: Optional[str] = None) -> MethodRequest: """Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub. If no method request is yet available, will wait until it is available. @@ -384,7 +387,7 @@ async def receive_method_request(self, method_name=None): logger.info("Received method request") return method_request - async def send_method_response(self, method_response): + async def send_method_response(self, method_response: MethodResponse) -> None: """Send a response to a method request via the Azure IoT Hub or Azure IoT Edge Hub. If the connection to the service has not previously been opened by a call to connect, this @@ -419,7 +422,7 @@ async def send_method_response(self, method_response): logger.info("Successfully sent method response to Hub") - async def get_twin(self): + async def get_twin(self) -> Twin: """ Gets the device or module twin from the Azure IoT Hub or Azure IoT Edge Hub service. @@ -452,7 +455,7 @@ async def get_twin(self): logger.info("Successfully retrieved twin") return twin - async def patch_twin_reported_properties(self, reported_properties_patch): + async def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None: """ Update reported properties with the Azure IoT Hub or Azure IoT Edge Hub service. @@ -495,7 +498,7 @@ async def patch_twin_reported_properties(self, reported_properties_patch): current_version=device_constant.VERSION, details="We recommend that you use the .on_twin_desired_properties_patch_received property to set a handler instead", ) - async def receive_twin_desired_properties_patch(self): + async def receive_twin_desired_properties_patch(self) -> TwinPatch: """ Receive a desired property patch via the Azure IoT Hub or Azure IoT Edge Hub. @@ -519,7 +522,7 @@ async def receive_twin_desired_properties_patch(self): class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient): """An asynchronous device client that connects to an Azure IoT Hub instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubDeviceClient. This initializer should not be called directly. @@ -536,7 +539,7 @@ def __init__(self, mqtt_pipeline, http_pipeline): current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - async def receive_message(self): + async def receive_message(self) -> Message: """Receive a message that has been sent from the Azure IoT Hub. If no message is yet available, will wait until an item is available. @@ -555,7 +558,7 @@ async def receive_message(self): logger.info("Message received") return message - async def get_storage_info_for_blob(self, blob_name): + async def get_storage_info_for_blob(self, blob_name: str) -> StorageInfo: """Sends a POST request over HTTP to an IoTHub endpoint that will return information for uploading via the Azure Storage Account linked to the IoTHub your device is connected to. :param str blob_name: The name in string format of the blob that will be uploaded using the storage API. This name will be used to generate the proper credentials for Storage, and needs to match what will be used with the Azure Storage SDK to perform the blob upload. @@ -573,8 +576,8 @@ async def get_storage_info_for_blob(self, blob_name): return storage_info async def notify_blob_upload_status( - self, correlation_id, is_success, status_code, status_description - ): + self, correlation_id: str, is_success: bool, status_code: int, status_description: str + ) -> None: """When the upload is complete, the device sends a POST request to the IoT Hub endpoint with information on the status of an upload to blob attempt. This is used by IoT Hub to notify listening clients. :param str correlation_id: Provided by IoT Hub on get_storage_info_for_blob request. @@ -601,7 +604,7 @@ async def notify_blob_upload_status( class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient): """An asynchronous module client that connects to an Azure IoT Hub or Azure IoT Edge instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubModuleClient. This initializer should not be called directly. @@ -613,7 +616,7 @@ def __init__(self, mqtt_pipeline, http_pipeline): super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message - async def send_message_to_output(self, message, output_name): + async def send_message_to_output(self, message: Message, output_name: str) -> None: """Sends an event/message to the given module output. These are outgoing events and are meant to be "output events" @@ -664,7 +667,7 @@ async def send_message_to_output(self, message, output_name): current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - async def receive_message_on_input(self, input_name): + async def receive_message_on_input(self, input_name: str) -> Message: """Receive an input message that has been sent from another Module to a specific input. If no message is yet available, will wait until an item is available. @@ -685,7 +688,7 @@ async def receive_message_on_input(self, input_name): logger.info("Input message received on: " + input_name) return message - async def invoke_method(self, method_params, device_id, module_id=None): + async def invoke_method(self, method_params, device_id, module_id: Optional[str] = None) -> MethodResponse: """Invoke a method from your client onto a device or module client, and receive the response to the method call. :param dict method_params: Should contain a methodName (str), payload (str), diff --git a/azure-iot-device/azure/iot/device/iothub/models/message.py b/azure-iot-device/azure/iot/device/iothub/models/message.py index 599c27502..8332ec090 100644 --- a/azure-iot-device/azure/iot/device/iothub/models/message.py +++ b/azure-iot-device/azure/iot/device/iothub/models/message.py @@ -50,7 +50,7 @@ def __init__( self._iothub_interface_id = None @property - def iothub_interface_id(self): + def iothub_interface_id(self) -> str: return self._iothub_interface_id def set_as_security_message(self): @@ -64,7 +64,7 @@ def set_as_security_message(self): def __str__(self): return str(self.data) - def get_size(self): + def get_size(self) -> int: total = 0 total = total + sum( sys.getsizeof(v) diff --git a/azure-iot-device/azure/iot/device/iothub/models/methods.py b/azure-iot-device/azure/iot/device/iothub/models/methods.py index 16d60afaa..53fea1475 100644 --- a/azure-iot-device/azure/iot/device/iothub/models/methods.py +++ b/azure-iot-device/azure/iot/device/iothub/models/methods.py @@ -5,6 +5,9 @@ # -------------------------------------------------------------------------- """This module contains classes related to direct method invocations. """ +from typing import Optional +from typing_extensions import Self +from azure.iot.device.custom_typing import JSONSerializable class MethodRequest(object): @@ -15,7 +18,7 @@ class MethodRequest(object): :ivar dict payload: The JSON payload being sent with the request. """ - def __init__(self, request_id, name, payload): + def __init__(self, request_id: str, name: str, payload: JSONSerializable): """Initializer for a MethodRequest. :param str request_id: The request id. @@ -27,15 +30,15 @@ def __init__(self, request_id, name, payload): self._payload = payload @property - def request_id(self): + def request_id(self) -> str: return self._request_id @property - def name(self): + def name(self) -> str: return self._name @property - def payload(self): + def payload(self) -> JSONSerializable: return self._payload @@ -48,7 +51,7 @@ class MethodResponse(object): :type payload: dict, str, int, float, bool, or None (JSON compatible values) """ - def __init__(self, request_id, status, payload=None): + def __init__(self, request_id: str, status: int, payload: Optional[JSONSerializable] = None): """Initializer for MethodResponse. :param str request_id: The request id of the MethodRequest being responded to. @@ -61,7 +64,7 @@ def __init__(self, request_id, status, payload=None): self.payload = payload @classmethod - def create_from_method_request(cls, method_request, status, payload=None): + def create_from_method_request(cls, method_request: MethodRequest, status: int, payload: Optional[JSONSerializable] = None) -> Self: """Factory method for creating a MethodResponse from a MethodRequest. :param method_request: The MethodRequest object to respond to. diff --git a/azure-iot-device/azure/iot/device/iothub/sync_clients.py b/azure-iot-device/azure/iot/device/iothub/sync_clients.py index a90cc04a0..2d51df42b 100644 --- a/azure-iot-device/azure/iot/device/iothub/sync_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/sync_clients.py @@ -14,7 +14,7 @@ AbstractIoTHubDeviceClient, AbstractIoTHubModuleClient, ) -from .models import Message +from .models import Message, MethodResponse, MethodRequest from .inbox_manager import InboxManager from .sync_inbox import SyncClientInbox, InboxEmpty from . import sync_handler_manager @@ -23,7 +23,9 @@ from azure.iot.device import exceptions from azure.iot.device.common.evented_callback import EventedCallback from azure.iot.device import constant as device_constant - +from .pipeline import MQTTPipeline, HTTPPipeline +from azure.iot.device.custom_typing import StorageInfo, Twin, TwinPatch +from typing import Any, Callable, Optional, Union logger = logging.getLogger(__name__) @@ -91,7 +93,7 @@ def __init__(self, **kwargs): self._mqtt_pipeline.on_method_request_received = self._inbox_manager.route_method_request self._mqtt_pipeline.on_twin_patch_received = self._inbox_manager.route_twin_patch - def _enable_feature(self, feature_name): + def _enable_feature(self, feature_name: str) -> None: """Enable an Azure IoT Hub feature. This is a synchronous call, meaning that this function will not return until the feature @@ -111,7 +113,7 @@ def _enable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _disable_feature(self, feature_name): + def _disable_feature(self, feature_name: str) -> None: """Disable an Azure IoT Hub feature This is a synchronous call, meaning that this function will not return until the feature @@ -132,7 +134,7 @@ def _disable_feature(self, feature_name): # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler): + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Callable[[], Any]) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call, meaning that this function will not return until the feature @@ -154,7 +156,7 @@ def _generic_receive_handler_setter(self, handler_name, feature_name, new_handle elif new_handler is None and self._mqtt_pipeline.feature_enabled[feature_name]: self._disable_feature(feature_name) - def shutdown(self): + def shutdown(self) -> None: """Shut down the client for graceful exit. Once this method is called, any attempts at further client calls will result in a @@ -197,7 +199,7 @@ def shutdown(self): # capability for HTTP pipeline. logger.info("Client shutdown complete") - def connect(self): + def connect(self) -> None: """Connects the client to an Azure IoT Hub or Azure IoT Edge Hub instance. The destination is chosen based on the credentials passed via the auth_provider parameter @@ -224,7 +226,7 @@ def connect(self): logger.info("Successfully connected to Hub") - def disconnect(self): + def disconnect(self) -> None: """Disconnect the client from the Azure IoT Hub or Azure IoT Edge Hub instance. It is recommended that you make sure to call this function when you are completely done @@ -270,7 +272,7 @@ def disconnect(self): logger.info("Successfully disconnected from Hub") - def update_sastoken(self, sastoken): + def update_sastoken(self, sastoken: str) -> None: """ Update the client's SAS Token used for authentication, then reauthorizes the connection. @@ -306,7 +308,7 @@ def update_sastoken(self, sastoken): logger.info("Successfully reauthorized connection to Hub") - def send_message(self, message): + def send_message(self, message: Union[Message, str]) -> None: """Sends a message to the default events endpoint on the Azure IoT Hub or Azure IoT Edge Hub instance. This is a synchronous event, meaning that this function will not return until the event @@ -352,7 +354,7 @@ def send_message(self, message): current_version=device_constant.VERSION, details="We recommend that you use the .on_method_request_received property to set a handler instead", ) - def receive_method_request(self, method_name=None, block=True, timeout=None): + def receive_method_request(self, method_name: Optional[str] = None, block: bool = True, timeout: Optional[int] = None) -> MethodRequest: """Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub. :param str method_name: Optionally provide the name of the method to receive requests for. @@ -380,7 +382,7 @@ def receive_method_request(self, method_name=None, block=True, timeout=None): logger.info("Did not receive method request") return method_request - def send_method_response(self, method_response): + def send_method_response(self, method_response: MethodResponse) -> None: """Send a response to a method request via the Azure IoT Hub or Azure IoT Edge Hub. This is a synchronous event, meaning that this function will not return until the event @@ -413,7 +415,7 @@ def send_method_response(self, method_response): logger.info("Successfully sent method response to Hub") - def get_twin(self): + def get_twin(self) -> Twin: """ Gets the device or module twin from the Azure IoT Hub or Azure IoT Edge Hub service. @@ -446,7 +448,7 @@ def get_twin(self): logger.info("Successfully retrieved twin") return twin - def patch_twin_reported_properties(self, reported_properties_patch): + def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None: """ Update reported properties with the Azure IoT Hub or Azure IoT Edge Hub service. @@ -488,7 +490,7 @@ def patch_twin_reported_properties(self, reported_properties_patch): current_version=device_constant.VERSION, details="We recommend that you use the .on_twin_desired_properties_patch_received property to set a handler instead", ) - def receive_twin_desired_properties_patch(self, block=True, timeout=None): + def receive_twin_desired_properties_patch(self, block=True, timeout=None) -> TwinPatch: """ Receive a desired property patch via the Azure IoT Hub or Azure IoT Edge Hub. @@ -528,7 +530,7 @@ def receive_twin_desired_properties_patch(self, block=True, timeout=None): class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient): """A synchronous device client that connects to an Azure IoT Hub instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubDeviceClient. This initializer should not be called directly. @@ -545,7 +547,7 @@ def __init__(self, mqtt_pipeline, http_pipeline): current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - def receive_message(self, block=True, timeout=None): + def receive_message(self, block=True, timeout=None) -> Message: """Receive a message that has been sent from the Azure IoT Hub. :param bool block: Indicates if the operation should block until a message is received. @@ -570,7 +572,7 @@ def receive_message(self, block=True, timeout=None): logger.info("No message received.") return message - def get_storage_info_for_blob(self, blob_name): + def get_storage_info_for_blob(self, blob_name: str) -> StorageInfo: """Sends a POST request over HTTP to an IoTHub endpoint that will return information for uploading via the Azure Storage Account linked to the IoTHub your device is connected to. :param str blob_name: The name in string format of the blob that will be uploaded using the storage API. This name will be used to generate the proper credentials for Storage, and needs to match what will be used with the Azure Storage SDK to perform the blob upload. @@ -584,8 +586,8 @@ def get_storage_info_for_blob(self, blob_name): return storage_info def notify_blob_upload_status( - self, correlation_id, is_success, status_code, status_description - ): + self, correlation_id: str, is_success: bool, status_code: int, status_description: str + ) -> None: """When the upload is complete, the device sends a POST request to the IoT Hub endpoint with information on the status of an upload to blob attempt. This is used by IoT Hub to notify listening clients. :param str correlation_id: Provided by IoT Hub on get_storage_info_for_blob request. @@ -608,7 +610,7 @@ def notify_blob_upload_status( class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient): """A synchronous module client that connects to an Azure IoT Hub or Azure IoT Edge instance.""" - def __init__(self, mqtt_pipeline, http_pipeline): + def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): """Initializer for a IoTHubModuleClient. This initializer should not be called directly. @@ -622,7 +624,7 @@ def __init__(self, mqtt_pipeline, http_pipeline): super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message - def send_message_to_output(self, message, output_name): + def send_message_to_output(self, message: Message, output_name: str) -> None: """Sends an event/message to the given module output. These are outgoing events and are meant to be "output events". @@ -673,7 +675,7 @@ def send_message_to_output(self, message, output_name): current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - def receive_message_on_input(self, input_name, block=True, timeout=None): + def receive_message_on_input(self, input_name: str, block: bool = True, timeout: Optional[int] = None) -> Message: """Receive an input message that has been sent from another Module to a specific input. :param str input_name: The input name to receive a message on. @@ -698,7 +700,7 @@ def receive_message_on_input(self, input_name, block=True, timeout=None): logger.info("No input message received on: " + input_name) return message - def invoke_method(self, method_params, device_id, module_id=None): + def invoke_method(self, method_params: dict, device_id: str, module_id=None): """Invoke a method from your client onto a device or module client, and receive the response to the method call. :param dict method_params: Should contain a methodName (str), payload (str), diff --git a/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py index 152dea1bd..85cbda79d 100644 --- a/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py @@ -10,15 +10,21 @@ import abc import logging +from typing_extensions import Self +from typing import Any, Dict, List, Optional, Union from azure.iot.device.provisioning import pipeline from azure.iot.device.common.auth import sastoken as st from azure.iot.device.common import auth, handle_exceptions +from .pipeline import MQTTPipeline +from azure.iot.device.common.models import X509 +from azure.iot.device.custom_typing import ProvisioningPayload +from azure.iot.device.provisioning.models import RegistrationResult logger = logging.getLogger(__name__) -def _validate_kwargs(exclude=[], **kwargs): +def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs): """Helper function to validate user provided kwargs. Raises TypeError if an invalid option has been provided""" @@ -37,12 +43,12 @@ def _validate_kwargs(exclude=[], **kwargs): raise TypeError("Unsupported keyword argument '{}'".format(kwarg)) -def validate_registration_id(reg_id): +def validate_registration_id(reg_id: str) -> None: if not (reg_id and reg_id.strip()): raise ValueError("Registration Id can not be none, empty or blank.") -def _get_config_kwargs(**kwargs): +def _get_config_kwargs(**kwargs) -> Dict[str, Any]: """Get the subset of kwargs which pertain the config object""" valid_config_kwargs = [ "server_verification_cert", @@ -60,7 +66,7 @@ def _get_config_kwargs(**kwargs): return config_kwargs -def _form_sas_uri(id_scope, registration_id): +def _form_sas_uri(id_scope: str, registration_id: str) -> str: return "{id_scope}/registrations/{registration_id}".format( id_scope=id_scope, registration_id=registration_id ) @@ -71,7 +77,7 @@ class AbstractProvisioningDeviceClient(abc.ABC): Super class for any client that can be used to register devices to Device Provisioning Service. """ - def __init__(self, pipeline): + def __init__(self, pipeline: MQTTPipeline): """ Initializes the provisioning client. @@ -89,8 +95,8 @@ def __init__(self, pipeline): @classmethod def create_from_symmetric_key( - cls, provisioning_host, registration_id, id_scope, symmetric_key, **kwargs - ): + cls, provisioning_host: str, registration_id: str, id_scope: str, symmetric_key: str, **kwargs + ) -> Self: """ Create a client which can be used to run the registration of a device with provisioning service using Symmetric Key authentication. @@ -163,8 +169,8 @@ def create_from_symmetric_key( @classmethod def create_from_x509_certificate( - cls, provisioning_host, registration_id, id_scope, x509, **kwargs - ): + cls, provisioning_host: str, registration_id: str, id_scope: str, x509: X509, **kwargs + ) -> Self: """ Create a client which can be used to run the registration of a device with provisioning service using X509 certificate authentication. @@ -224,18 +230,18 @@ def create_from_x509_certificate( return cls(mqtt_provisioning_pipeline) @abc.abstractmethod - def register(self): + def register(self) -> RegistrationResult: """ Register the device with the Device Provisioning Service. """ pass @property - def provisioning_payload(self): + def provisioning_payload(self) -> ProvisioningPayload: return self._provisioning_payload @provisioning_payload.setter - def provisioning_payload(self, provisioning_payload): + def provisioning_payload(self, provisioning_payload: ProvisioningPayload): """ Set the payload that will form the request payload in a registration request. @@ -245,7 +251,7 @@ def provisioning_payload(self, provisioning_payload): self._provisioning_payload = provisioning_payload -def log_on_register_complete(result=None): +def log_on_register_complete(result: Union[RegistrationResult, Exception] = None) -> None: # This could be a failed/successful registration result from DPS # or a error from polling machine. Response should be given appropriately if result is not None: diff --git a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py index acd79c533..82fb059ac 100644 --- a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py @@ -10,6 +10,7 @@ """ import logging +from typing import Callable from azure.iot.device.common import async_adapter from azure.iot.device.provisioning.abstract_provisioning_device_client import ( AbstractProvisioningDeviceClient, @@ -20,11 +21,12 @@ from azure.iot.device.provisioning.pipeline import exceptions as pipeline_exceptions from azure.iot.device import exceptions from azure.iot.device.provisioning.pipeline import constant as dps_constant +from azure.iot.device.provisioning.models import RegistrationResult logger = logging.getLogger(__name__) -async def handle_result(callback): +async def handle_result(callback: Callable[[], None]) -> None: try: return await callback.completion() except pipeline_exceptions.ConnectionDroppedError as e: @@ -49,7 +51,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient): using Symmetric Key or X509 authentication. """ - async def register(self): + async def register(self) -> RegistrationResult: """ Register the device with the provisioning service. @@ -94,7 +96,7 @@ async def register(self): return result - async def _enable_responses(self): + async def _enable_responses(self) -> None: """Enable to receive responses from Device Provisioning Service.""" logger.info("Enabling reception of response from Device Provisioning Service...") subscribe_async = async_adapter.emulate_async(self._pipeline.enable_responses) diff --git a/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py b/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py index 560d76720..a84573c44 100644 --- a/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py +++ b/azure-iot-device/azure/iot/device/provisioning/models/registration_result.py @@ -4,45 +4,8 @@ # license information. # -------------------------------------------------------------------------- import json - - -class RegistrationResult(object): - """ - The final result of a completed or failed registration attempt - :ivar:request_id: The request id to which the response is being obtained - :ivar:operation_id: The id of the operation as returned by the registration request. - :ivar status: The status of the registration process as returned by the provisioning service. - Values can be "unassigned", "assigning", "assigned", "failed", "disabled" - :ivar registration_state : Details like device id, assigned hub , date times etc returned - from the provisioning service. - """ - - def __init__(self, operation_id, status, registration_state=None): - """ - :param operation_id: The id of the operation as returned by the initial registration request. - :param status: The status of the registration process. - Values can be "unassigned", "assigning", "assigned", "failed", "disabled" - :param registration_state : Details like device id, assigned hub , date times etc returned - from the provisioning service. - """ - self._operation_id = operation_id - self._status = status - self._registration_state = registration_state - - @property - def operation_id(self): - return self._operation_id - - @property - def status(self): - return self._status - - @property - def registration_state(self): - return self._registration_state - - def __str__(self): - return "\n".join([str(self.registration_state), self.status]) +from typing import Optional +from azure.iot.device.custom_typing import JSONSerializable class RegistrationState(object): @@ -86,34 +49,73 @@ def __init__( self._response_payload = payload @property - def device_id(self): + def device_id(self) -> str: return self._device_id @property - def assigned_hub(self): + def assigned_hub(self) -> str: return self._assigned_hub @property - def sub_status(self): + def sub_status(self) -> str: return self._sub_status @property - def created_date_time(self): + def created_date_time(self) -> str: return self._created_date_time @property - def last_update_date_time(self): + def last_update_date_time(self) -> str: return self._last_update_date_time @property - def etag(self): + def etag(self) -> str: return self._etag @property - def response_payload(self): + def response_payload(self) -> JSONSerializable: return json.dumps(self._response_payload, default=lambda o: o.__dict__, sort_keys=True) def __str__(self): return "\n".join( [self.device_id, self.assigned_hub, self.sub_status, self.response_payload] ) + + +class RegistrationResult(object): + """ + The final result of a completed or failed registration attempt + :ivar:request_id: The request id to which the response is being obtained + :ivar:operation_id: The id of the operation as returned by the registration request. + :ivar status: The status of the registration process as returned by the provisioning service. + Values can be "unassigned", "assigning", "assigned", "failed", "disabled" + :ivar registration_state : Details like device id, assigned hub , date times etc returned + from the provisioning service. + """ + + def __init__(self, operation_id: str, status: str, registration_state: Optional[RegistrationState] = None): + """ + :param operation_id: The id of the operation as returned by the initial registration request. + :param status: The status of the registration process. + Values can be "unassigned", "assigning", "assigned", "failed", "disabled" + :param registration_state : Details like device id, assigned hub , date times etc returned + from the provisioning service. + """ + self._operation_id = operation_id + self._status = status + self._registration_state = registration_state + + @property + def operation_id(self) -> str: + return self._operation_id + + @property + def status(self) -> str: + return self._status + + @property + def registration_state(self) -> RegistrationState: + return self._registration_state + + def __str__(self): + return "\n".join([str(self.registration_state), self.status]) diff --git a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py index 479f9e9ec..eb6be2a69 100644 --- a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py @@ -9,18 +9,20 @@ IoT Hub via the Device Provisioning Service. """ import logging +from typing import Callable from azure.iot.device.common.evented_callback import EventedCallback from .abstract_provisioning_device_client import AbstractProvisioningDeviceClient from .abstract_provisioning_device_client import log_on_register_complete from azure.iot.device.provisioning.pipeline import constant as dps_constant from .pipeline import exceptions as pipeline_exceptions from azure.iot.device import exceptions +from azure.iot.device.provisioning.models import RegistrationResult logger = logging.getLogger(__name__) -def handle_result(callback): +def handle_result(callback: Callable[[], None]) -> None: try: return callback.wait_for_completion() except pipeline_exceptions.ConnectionDroppedError as e: @@ -47,7 +49,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient): using Symmetric Key or X509 authentication. """ - def register(self): + def register(self) -> RegistrationResult: """ Register the device with the provisioning service @@ -94,7 +96,7 @@ def register(self): return result - def _enable_responses(self): + def _enable_responses(self) -> None: """Enable to receive responses from Device Provisioning Service. This is a synchronous call, meaning that this function will not return until the feature From 1b129a86e1c670148c45adf7b7770cdb59f30b7c Mon Sep 17 00:00:00 2001 From: Oliva Kar Date: Mon, 22 Jan 2024 14:40:21 -0800 Subject: [PATCH 2/6] wip --- azure-iot-device/azure/iot/device/iothub/abstract_clients.py | 2 +- azure-iot-device/py.typed | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 azure-iot-device/py.typed diff --git a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py index 51c4abad5..f2ee20e24 100644 --- a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py @@ -243,7 +243,7 @@ def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, pass @classmethod - def create_from_connection_string(cls, connection_string: str, **kwargs: Dict[str, Any]) -> Self: + def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self: """ Instantiate the client from a IoTHub device or module connection string. diff --git a/azure-iot-device/py.typed b/azure-iot-device/py.typed new file mode 100644 index 000000000..e69de29bb From cd24910b3bfeb66b13387ca0da915318fea9294b Mon Sep 17 00:00:00 2001 From: Oliva Kar Date: Mon, 22 Jan 2024 17:47:25 -0800 Subject: [PATCH 3/6] func or co --- .../iot/device/iothub/abstract_clients.py | 36 +++++++++---------- .../iot/device/iothub/aio/async_clients.py | 8 ++--- .../azure/iot/device/iothub/sync_clients.py | 6 ++-- .../aio/async_provisioning_device_client.py | 5 +-- .../provisioning_device_client.py | 4 +-- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py index f2ee20e24..aff2ab84a 100644 --- a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py @@ -24,8 +24,8 @@ from . import edge_hsm from .pipeline import MQTTPipeline, HTTPPipeline from typing_extensions import Self -from azure.iot.device.custom_typing import JSONSerializable, Twin, TwinPatch -from typing import Any, Callable, Dict, List, Optional, Union +from azure.iot.device.custom_typing import FunctionOrCoroutine, JSONSerializable, Twin, TwinPatch +from typing import Any, Dict, List, Optional, Union logger = logging.getLogger(__name__) @@ -238,7 +238,7 @@ def _replace_user_supplied_sastoken(self, sastoken_str: str) -> None: self._mqtt_pipeline.pipeline_configuration.sastoken = new_token_o @abc.abstractmethod - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Optional[Callable[[], Any]]) -> None: + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Optional[FunctionOrCoroutine[[Any], Any]]) -> None: # Will be implemented differently in child classes, but define here for static analysis pass @@ -446,7 +446,7 @@ def connected(self) -> bool: return self._mqtt_pipeline.connected @property - def on_connection_state_change(self) -> Callable[[Any], Any]: + def on_connection_state_change(self) -> FunctionOrCoroutine[[None], None]: """The handler function or coroutine that will be called when the connection state changes. The function or coroutine definition should take no positional arguments. @@ -454,11 +454,11 @@ def on_connection_state_change(self) -> Callable[[Any], Any]: return self._handler_manager.on_connection_state_change @on_connection_state_change.setter - def on_connection_state_change(self, value: Callable[[Any], Any]) -> None: + def on_connection_state_change(self, value: FunctionOrCoroutine[[None], None]) -> None: self._handler_manager.on_connection_state_change = value @property - def on_new_sastoken_required(self) -> Callable[[Any], Any]: + def on_new_sastoken_required(self) -> FunctionOrCoroutine[[None], None]: """The handler function or coroutine that will be called when the client requires a new SAS token. This will happen approximately 2 minutes before the SAS Token expires. On Windows platforms, if the lifespan exceeds approximately 49 days, a new token will @@ -475,11 +475,11 @@ def on_new_sastoken_required(self) -> Callable[[Any], Any]: return self._handler_manager.on_new_sastoken_required @on_new_sastoken_required.setter - def on_new_sastoken_required(self, value: Callable[[Any], Any]) -> None: + def on_new_sastoken_required(self, value: FunctionOrCoroutine[[None], None]) -> None: self._handler_manager.on_new_sastoken_required = value @property - def on_background_exception(self) -> Callable[[Any], Any]: + def on_background_exception(self) -> FunctionOrCoroutine[[Exception], None]: """The handler function or coroutine will be called when a background exception occurs. The function or coroutine definition should take one positional argument (the exception @@ -487,16 +487,16 @@ def on_background_exception(self) -> Callable[[Any], Any]: return self._handler_manager.on_background_exception @on_background_exception.setter - def on_background_exception(self, value: Callable[[Any], Any]) -> None: + def on_background_exception(self, value: FunctionOrCoroutine[[Exception], None]) -> None: self._handler_manager.on_background_exception = value @abc.abstractproperty - def on_message_received(self) -> Callable[[Any], Any]: + def on_message_received(self) -> FunctionOrCoroutine[[Message], None]: # Defined below on AbstractIoTHubDeviceClient / AbstractIoTHubModuleClient pass @property - def on_method_request_received(self) -> Callable[[Any], Any]: + def on_method_request_received(self) -> FunctionOrCoroutine[[MethodRequest], None]: """The handler function or coroutine that will be called when a method request is received. Remember to acknowledge the method request in your function or coroutine via use of the @@ -507,13 +507,13 @@ def on_method_request_received(self) -> Callable[[Any], Any]: return self._handler_manager.on_method_request_received @on_method_request_received.setter - def on_method_request_received(self, value: Callable[[Any], Any]) -> None: + def on_method_request_received(self, value: FunctionOrCoroutine[[MethodRequest], None]) -> None: self._generic_receive_handler_setter( "on_method_request_received", pipeline_constant.METHODS, value ) @property - def on_twin_desired_properties_patch_received(self) -> Callable[[Any], Any]: + def on_twin_desired_properties_patch_received(self) -> FunctionOrCoroutine[[TwinPatch], None]: """The handler function or coroutine that will be called when a twin desired properties patch is received. @@ -522,7 +522,7 @@ def on_twin_desired_properties_patch_received(self) -> Callable[[Any], Any]: return self._handler_manager.on_twin_desired_properties_patch_received @on_twin_desired_properties_patch_received.setter - def on_twin_desired_properties_patch_received(self, value: Callable[[Any], Any]): + def on_twin_desired_properties_patch_received(self, value: FunctionOrCoroutine[[TwinPatch], None]): self._generic_receive_handler_setter( "on_twin_desired_properties_patch_received", pipeline_constant.TWIN_PATCHES, value ) @@ -677,7 +677,7 @@ def notify_blob_upload_status( pass @property - def on_message_received(self) -> Callable[[Any], Any]: + def on_message_received(self) -> FunctionOrCoroutine[[Message], None]: """The handler function or coroutine that will be called when a message is received. The function or coroutine definition should take one positional argument (the @@ -685,7 +685,7 @@ def on_message_received(self) -> Callable[[Any], Any]: return self._handler_manager.on_message_received @on_message_received.setter - def on_message_received(self, value: Callable[[Any], Any]): + def on_message_received(self, value: FunctionOrCoroutine[[Message], None]): self._generic_receive_handler_setter( "on_message_received", pipeline_constant.C2D_MSG, value ) @@ -903,7 +903,7 @@ def invoke_method(self, method_params: dict, device_id: str, module_id: Optional pass @property - def on_message_received(self) -> Callable[[Any], Any]: + def on_message_received(self) -> FunctionOrCoroutine[[Message], Any]: """The handler function or coroutine that will be called when an input message is received. The function definition or coroutine should take one positional argument (the @@ -911,7 +911,7 @@ def on_message_received(self) -> Callable[[Any], Any]: return self._handler_manager.on_message_received @on_message_received.setter - def on_message_received(self, value: Callable[[Any], Any]) -> None: + def on_message_received(self, value: FunctionOrCoroutine[[Message], Any]) -> None: self._generic_receive_handler_setter( "on_message_received", pipeline_constant.INPUT_MSG, value ) diff --git a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py index f66c8ff91..ba5fced57 100644 --- a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py @@ -25,13 +25,13 @@ from . import async_handler_manager, loop_management from azure.iot.device import constant as device_constant from azure.iot.device.iothub.pipeline import MQTTPipeline, HTTPPipeline -from azure.iot.device.custom_typing import StorageInfo, Twin, TwinPatch -from typing import Any, Callable, Optional, Union +from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch +from typing import Any, Optional, Union logger = logging.getLogger(__name__) -async def handle_result(callback: Callable[[], None]): +async def handle_result(callback: FunctionOrCoroutine[[Any], None]): try: return await callback.completion() except pipeline_exceptions.ConnectionDroppedError as e: @@ -134,7 +134,7 @@ async def _disable_feature(self, feature_name: str) -> None: # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Callable[[], Any]) -> None: + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[], None]) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call (yes, even though this is the async client), meaning that this diff --git a/azure-iot-device/azure/iot/device/iothub/sync_clients.py b/azure-iot-device/azure/iot/device/iothub/sync_clients.py index 2d51df42b..e9e1b8c54 100644 --- a/azure-iot-device/azure/iot/device/iothub/sync_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/sync_clients.py @@ -24,8 +24,8 @@ from azure.iot.device.common.evented_callback import EventedCallback from azure.iot.device import constant as device_constant from .pipeline import MQTTPipeline, HTTPPipeline -from azure.iot.device.custom_typing import StorageInfo, Twin, TwinPatch -from typing import Any, Callable, Optional, Union +from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch +from typing import Any, Optional, Union logger = logging.getLogger(__name__) @@ -134,7 +134,7 @@ def _disable_feature(self, feature_name: str) -> None: # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Callable[[], Any]) -> None: + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[Any], Any]) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call, meaning that this function will not return until the feature diff --git a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py index 82fb059ac..3c50a8d50 100644 --- a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py @@ -10,8 +10,9 @@ """ import logging -from typing import Callable +from typing import Any from azure.iot.device.common import async_adapter +from azure.iot.device.custom_typing import FunctionOrCoroutine from azure.iot.device.provisioning.abstract_provisioning_device_client import ( AbstractProvisioningDeviceClient, ) @@ -26,7 +27,7 @@ logger = logging.getLogger(__name__) -async def handle_result(callback: Callable[[], None]) -> None: +async def handle_result(callback: FunctionOrCoroutine[[Any], None]) -> None: try: return await callback.completion() except pipeline_exceptions.ConnectionDroppedError as e: diff --git a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py index eb6be2a69..1a422e4ad 100644 --- a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py @@ -9,8 +9,8 @@ IoT Hub via the Device Provisioning Service. """ import logging -from typing import Callable from azure.iot.device.common.evented_callback import EventedCallback +from azure.iot.device.custom_typing import FunctionOrCoroutine from .abstract_provisioning_device_client import AbstractProvisioningDeviceClient from .abstract_provisioning_device_client import log_on_register_complete from azure.iot.device.provisioning.pipeline import constant as dps_constant @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) -def handle_result(callback: Callable[[], None]) -> None: +def handle_result(callback: FunctionOrCoroutine[[Any], None]) -> None: try: return callback.wait_for_completion() except pipeline_exceptions.ConnectionDroppedError as e: From eccf8ad299ffff602afcd6eb578b2b8f01f1dbc4 Mon Sep 17 00:00:00 2001 From: Oliva Kar Date: Mon, 22 Jan 2024 17:59:34 -0800 Subject: [PATCH 4/6] wip --- .../azure/iot/device/provisioning/provisioning_device_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py index 1a422e4ad..df31399dc 100644 --- a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py @@ -9,6 +9,7 @@ IoT Hub via the Device Provisioning Service. """ import logging +from typing import Any from azure.iot.device.common.evented_callback import EventedCallback from azure.iot.device.custom_typing import FunctionOrCoroutine from .abstract_provisioning_device_client import AbstractProvisioningDeviceClient From 000de953de153bebc4cd678a55d8aba0079172dc Mon Sep 17 00:00:00 2001 From: Oliva Kar Date: Mon, 22 Jan 2024 19:11:38 -0800 Subject: [PATCH 5/6] wip --- azure-iot-device/azure/iot/device/iothub/aio/async_clients.py | 2 +- azure-iot-device/azure/iot/device/iothub/sync_clients.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py index ba5fced57..457b299ac 100644 --- a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py @@ -134,7 +134,7 @@ async def _disable_feature(self, feature_name: str) -> None: # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[], None]) -> None: + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None]) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call (yes, even though this is the async client), meaning that this diff --git a/azure-iot-device/azure/iot/device/iothub/sync_clients.py b/azure-iot-device/azure/iot/device/iothub/sync_clients.py index e9e1b8c54..8748712fe 100644 --- a/azure-iot-device/azure/iot/device/iothub/sync_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/sync_clients.py @@ -134,7 +134,7 @@ def _disable_feature(self, feature_name: str) -> None: # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[Any], Any]) -> None: + def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None]) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call, meaning that this function will not return until the feature From 281176a3fad0616bf02396991ab8d6bfffff0daf Mon Sep 17 00:00:00 2001 From: Carter Tinney Date: Tue, 23 Jan 2024 11:52:25 -0800 Subject: [PATCH 6/6] added future annotations --- .pre-commit-config.yaml | 2 +- .../iot/device/iothub/abstract_clients.py | 39 +++++++++++++------ .../iot/device/iothub/aio/async_clients.py | 10 +++-- .../azure/iot/device/iothub/sync_clients.py | 16 +++++--- .../aio/async_provisioning_device_client.py | 2 +- .../provisioning_device_client.py | 1 + setup.py | 1 + 7 files changed, 50 insertions(+), 21 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8c2296054..059c326f2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: hooks: - id: black language_version: python3 -- repo: https://gitlab.com/pycqa/flake8 +- repo: https://github.com/pycqa/flake8 rev: 3.9.1 # Use the ref you want to point at hooks: - id: flake8 diff --git a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py index aff2ab84a..6fd44f924 100644 --- a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py @@ -5,7 +5,7 @@ # -------------------------------------------------------------------------- """This module contains abstract classes for the various clients of the Azure IoT Hub Device SDK """ - +from __future__ import annotations import abc import logging import threading @@ -238,7 +238,12 @@ def _replace_user_supplied_sastoken(self, sastoken_str: str) -> None: self._mqtt_pipeline.pipeline_configuration.sastoken = new_token_o @abc.abstractmethod - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: Optional[FunctionOrCoroutine[[Any], Any]]) -> None: + def _generic_receive_handler_setter( + self, + handler_name: str, + feature_name: str, + new_handler: Optional[FunctionOrCoroutine[[Any], Any]], + ) -> None: # Will be implemented differently in child classes, but define here for static analysis pass @@ -315,7 +320,7 @@ def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self hostname=connection_string[cs.HOST_NAME], gateway_hostname=connection_string.get(cs.GATEWAY_HOST_NAME), sastoken=sastoken, - **config_kwargs + **config_kwargs, ) if cls.__name__ == "IoTHubDeviceClient": pipeline_configuration.blob_upload = True @@ -387,7 +392,7 @@ def create_from_sastoken(cls, sastoken: str, **kwargs: Dict[str, Any]) -> Self: module_id=vals["module_id"], hostname=vals["hostname"], sastoken=sastoken_o, - **config_kwargs + **config_kwargs, ) if cls.__name__ == "IoTHubDeviceClient": pipeline_configuration.blob_upload = True # Blob Upload is a feature on Device Clients @@ -423,7 +428,9 @@ def receive_method_request(self, method_name: Optional[str] = None) -> None: pass @abc.abstractmethod - def send_method_response(self, method_request: MethodRequest, payload: Dict[str, JSONSerializable], status: int) -> None: + def send_method_response( + self, method_request: MethodRequest, payload: Dict[str, JSONSerializable], status: int + ) -> None: pass @abc.abstractmethod @@ -522,7 +529,9 @@ def on_twin_desired_properties_patch_received(self) -> FunctionOrCoroutine[[Twin return self._handler_manager.on_twin_desired_properties_patch_received @on_twin_desired_properties_patch_received.setter - def on_twin_desired_properties_patch_received(self, value: FunctionOrCoroutine[[TwinPatch], None]): + def on_twin_desired_properties_patch_received( + self, value: FunctionOrCoroutine[[TwinPatch], None] + ): self._generic_receive_handler_setter( "on_twin_desired_properties_patch_received", pipeline_constant.TWIN_PATCHES, value ) @@ -530,7 +539,9 @@ def on_twin_desired_properties_patch_received(self, value: FunctionOrCoroutine[[ class AbstractIoTHubDeviceClient(AbstractIoTHubClient): @classmethod - def create_from_x509_certificate(cls, x509: X509, hostname: str, device_id: str, **kwargs) -> Self: + def create_from_x509_certificate( + cls, x509: X509, hostname: str, device_id: str, **kwargs + ) -> Self: """ Instantiate a client using X509 certificate authentication. @@ -592,7 +603,9 @@ def create_from_x509_certificate(cls, x509: X509, hostname: str, device_id: str, return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_symmetric_key(cls, symmetric_key: str, hostname: str, device_id: str, **kwargs) -> Self: + def create_from_symmetric_key( + cls, symmetric_key: str, hostname: str, device_id: str, **kwargs + ) -> Self: """ Instantiate a client using symmetric key authentication. @@ -815,7 +828,7 @@ def create_from_edge_environment(cls, **kwargs) -> Self: gateway_hostname=gateway_hostname, sastoken=sastoken, server_verification_cert=server_verification_cert, - **config_kwargs + **config_kwargs, ) pipeline_configuration.ensure_desired_properties = True @@ -830,7 +843,9 @@ def create_from_edge_environment(cls, **kwargs) -> Self: return cls(mqtt_pipeline, http_pipeline) @classmethod - def create_from_x509_certificate(cls, x509: X509, hostname: str, device_id: str, module_id: str, **kwargs) -> Self: + def create_from_x509_certificate( + cls, x509: X509, hostname: str, device_id: str, module_id: str, **kwargs + ) -> Self: """ Instantiate a client using X509 certificate authentication. @@ -899,7 +914,9 @@ def receive_message_on_input(self, input_name: str) -> Message: pass @abc.abstractmethod - def invoke_method(self, method_params: dict, device_id: str, module_id: Optional[str] = None) -> None: + def invoke_method( + self, method_params: dict, device_id: str, module_id: Optional[str] = None + ) -> None: pass @property diff --git a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py index 457b299ac..88ea8987f 100644 --- a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py @@ -6,7 +6,7 @@ """This module contains user-facing asynchronous clients for the Azure IoTHub Device SDK for Python. """ - +from __future__ import annotations import logging import asyncio import deprecation @@ -134,7 +134,9 @@ async def _disable_feature(self, feature_name: str) -> None: # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None]) -> None: + def _generic_receive_handler_setter( + self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None] + ) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call (yes, even though this is the async client), meaning that this @@ -688,7 +690,9 @@ async def receive_message_on_input(self, input_name: str) -> Message: logger.info("Input message received on: " + input_name) return message - async def invoke_method(self, method_params, device_id, module_id: Optional[str] = None) -> MethodResponse: + async def invoke_method( + self, method_params, device_id, module_id: Optional[str] = None + ) -> MethodResponse: """Invoke a method from your client onto a device or module client, and receive the response to the method call. :param dict method_params: Should contain a methodName (str), payload (str), diff --git a/azure-iot-device/azure/iot/device/iothub/sync_clients.py b/azure-iot-device/azure/iot/device/iothub/sync_clients.py index 8748712fe..98d9b8104 100644 --- a/azure-iot-device/azure/iot/device/iothub/sync_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/sync_clients.py @@ -6,7 +6,7 @@ """This module contains user-facing synchronous clients for the Azure IoTHub Device SDK for Python. """ - +from __future__ import annotations import logging import deprecation from .abstract_clients import ( @@ -25,7 +25,7 @@ from azure.iot.device import constant as device_constant from .pipeline import MQTTPipeline, HTTPPipeline from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch -from typing import Any, Optional, Union +from typing import Optional, Union logger = logging.getLogger(__name__) @@ -134,7 +134,9 @@ def _disable_feature(self, feature_name: str) -> None: # This branch shouldn't be reached, but in case it is, log it logger.info("Feature ({}) already disabled - skipping".format(feature_name)) - def _generic_receive_handler_setter(self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None]) -> None: + def _generic_receive_handler_setter( + self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None] + ) -> None: """Set a receive handler on the handler manager and enable the corresponding feature. This is a synchronous call, meaning that this function will not return until the feature @@ -354,7 +356,9 @@ def send_message(self, message: Union[Message, str]) -> None: current_version=device_constant.VERSION, details="We recommend that you use the .on_method_request_received property to set a handler instead", ) - def receive_method_request(self, method_name: Optional[str] = None, block: bool = True, timeout: Optional[int] = None) -> MethodRequest: + def receive_method_request( + self, method_name: Optional[str] = None, block: bool = True, timeout: Optional[int] = None + ) -> MethodRequest: """Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub. :param str method_name: Optionally provide the name of the method to receive requests for. @@ -675,7 +679,9 @@ def send_message_to_output(self, message: Message, output_name: str) -> None: current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - def receive_message_on_input(self, input_name: str, block: bool = True, timeout: Optional[int] = None) -> Message: + def receive_message_on_input( + self, input_name: str, block: bool = True, timeout: Optional[int] = None + ) -> Message: """Receive an input message that has been sent from another Module to a specific input. :param str input_name: The input name to receive a message on. diff --git a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py index 3c50a8d50..a96032191 100644 --- a/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/aio/async_provisioning_device_client.py @@ -8,7 +8,7 @@ Device SDK. This client uses Symmetric Key and X509 authentication to register devices with an IoT Hub via the Device Provisioning Service. """ - +from __future__ import annotations import logging from typing import Any from azure.iot.device.common import async_adapter diff --git a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py index df31399dc..fb953b7a4 100644 --- a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py @@ -8,6 +8,7 @@ Device SDK. This client uses Symmetric Key and X509 authentication to register devices with an IoT Hub via the Device Provisioning Service. """ +from __future__ import annotations import logging from typing import Any from azure.iot.device.common.evented_callback import EventedCallback diff --git a/setup.py b/setup.py index 92f0bb326..07bece21b 100644 --- a/setup.py +++ b/setup.py @@ -85,6 +85,7 @@ "requests-unixsocket>=0.1.5,<1.0.0", "janus", "PySocks", + "typing_extensions", ], python_requires=">=3.6, <4", packages=find_namespace_packages(where="azure-iot-device"),