diff --git a/azure-iot-device/azure/iot/device/common/http_transport.py b/azure-iot-device/azure/iot/device/common/http_transport.py index ab1cbfbdc..26fffda2c 100644 --- a/azure-iot-device/azure/iot/device/common/http_transport.py +++ b/azure-iot-device/azure/iot/device/common/http_transport.py @@ -6,7 +6,7 @@ import logging import ssl -import requests +import requests # type: ignore from . import transport_exceptions as exceptions from .pipeline import pipeline_thread diff --git a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py index 1533fc353..cec549882 100644 --- a/azure-iot-device/azure/iot/device/iothub/abstract_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/abstract_clients.py @@ -17,7 +17,7 @@ from azure.iot.device.common.auth import connection_string as cs from azure.iot.device.common.auth import sastoken as st from azure.iot.device.iothub import client_event -from azure.iot.device.iothub.models import Message, MethodRequest +from azure.iot.device.iothub.models import Message, MethodRequest, MethodResponse from azure.iot.device.common.models import X509 from azure.iot.device import exceptions from azure.iot.device.common import auth, handle_exceptions @@ -49,7 +49,7 @@ def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs) -> None: ] for kwarg in kwargs: - if (kwarg not in valid_kwargs) or (kwarg in exclude): + if (kwarg not in valid_kwargs) or (exclude is not None and kwarg in exclude): raise TypeError("Unsupported keyword argument: '{}'".format(kwarg)) @@ -99,7 +99,7 @@ def _extract_sas_uri_values(uri: str) -> Dict[str, Any]: try: d["module_id"] = items[4] except IndexError: - d["module_id"] = None + d["module_id"] = "" return d @@ -131,44 +131,48 @@ def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline) -> def _on_connected(self) -> None: """Helper handler that is called upon an iothub pipeline connect""" logger.info("Connection State - Connected") - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) - client_event_inbox.put(event) - # Ensure that all handlers are running now that connection is re-established. - self._handler_manager.ensure_running() + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) + client_event_inbox.put(event) + # Ensure that all handlers are running now that connection is re-established. + self._handler_manager.ensure_running() def _on_disconnected(self) -> None: """Helper handler that is called upon an iothub pipeline disconnect""" logger.info("Connection State - Disconnected") - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) - client_event_inbox.put(event) - # Locally stored method requests on client are cleared. - # They will be resent by IoTHub on reconnect. - self._inbox_manager.clear_all_method_requests() - logger.info("Cleared all pending method requests due to disconnect") + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE) + client_event_inbox.put(event) + # Locally stored method requests on client are cleared. + # They will be resent by IoTHub on reconnect. + self._inbox_manager.clear_all_method_requests() + logger.info("Cleared all pending method requests due to disconnect") def _on_new_sastoken_required(self) -> None: """Helper handler that is called upon the iothub pipeline needing new SAS token""" logger.info("New SasToken required from user") - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED) - client_event_inbox.put(event) + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED) + client_event_inbox.put(event) def _on_background_exception(self, e: Exception) -> None: """Helper handler that is called upon an iothub pipeline background exception""" handle_exceptions.handle_background_exception(e) - client_event_inbox = self._inbox_manager.get_client_event_inbox() - # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it - if self._handler_manager.handling_client_events: - event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e) - client_event_inbox.put(event) + if self._inbox_manager is not None: + client_event_inbox = self._inbox_manager.get_client_event_inbox() + # Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it + if self._handler_manager.handling_client_events: + event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e) + client_event_inbox.put(event) def _check_receive_mode_is_api(self) -> None: """Call this function first in EVERY receive API""" @@ -190,7 +194,8 @@ def _check_receive_mode_is_handler(self) -> None: # Lock the client to ONLY use receive handlers (no APIs) self._receive_type = RECEIVE_TYPE_HANDLER # Set the inbox manager to use unified msg receives - self._inbox_manager.use_unified_msg_mode = True + if self._inbox_manager is not None: + self._inbox_manager.use_unified_msg_mode = True elif self._receive_type is RECEIVE_TYPE_API: raise exceptions.ClientError( "Cannot set receive handlers - receive APIs have already been used" @@ -292,18 +297,18 @@ def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self _validate_kwargs(exclude=excluded_kwargs, **kwargs) # Create SasToken - connection_string = cs.ConnectionString(connection_string) - if connection_string.get(cs.X509) is not None: + connection_string_dict = cs.ConnectionString(connection_string) + if connection_string_dict.get(cs.X509) is not None: raise ValueError( "Use the .create_from_x509_certificate() method instead when using X509 certificates" ) uri = _form_sas_uri( - hostname=connection_string[cs.HOST_NAME], - device_id=connection_string[cs.DEVICE_ID], - module_id=connection_string.get(cs.MODULE_ID), + hostname=connection_string_dict[cs.HOST_NAME], + device_id=connection_string_dict[cs.DEVICE_ID], + module_id=connection_string_dict.get(cs.MODULE_ID), ) signing_mechanism = auth.SymmetricKeySigningMechanism( - key=connection_string[cs.SHARED_ACCESS_KEY] + key=connection_string_dict[cs.SHARED_ACCESS_KEY] ) token_ttl = kwargs.get("sastoken_ttl", 3600) try: @@ -315,10 +320,10 @@ def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self # Pipeline Config setup config_kwargs = _get_config_kwargs(**kwargs) pipeline_configuration = pipeline.IoTHubPipelineConfig( - device_id=connection_string[cs.DEVICE_ID], - module_id=connection_string.get(cs.MODULE_ID), - hostname=connection_string[cs.HOST_NAME], - gateway_hostname=connection_string.get(cs.GATEWAY_HOST_NAME), + device_id=connection_string_dict[cs.DEVICE_ID], + module_id=connection_string_dict.get(cs.MODULE_ID), + hostname=connection_string_dict[cs.HOST_NAME], + gateway_hostname=connection_string_dict.get(cs.GATEWAY_HOST_NAME), sastoken=sastoken, **config_kwargs, ) @@ -429,7 +434,7 @@ def receive_method_request(self, method_name: Optional[str] = None) -> None: @abc.abstractmethod def send_method_response( - self, method_request: MethodRequest, payload: Dict[str, JSONSerializable], status: int + self, method_response: MethodResponse ) -> None: pass @@ -458,11 +463,13 @@ def on_connection_state_change(self) -> FunctionOrCoroutine[[None], None]: The function or coroutine definition should take no positional arguments. """ - return self._handler_manager.on_connection_state_change + if self._handler_manager is not None: + return self._handler_manager.on_connection_state_change @on_connection_state_change.setter def on_connection_state_change(self, value: FunctionOrCoroutine[[None], None]) -> None: - self._handler_manager.on_connection_state_change = value + if self._handler_manager is not None: + self._handler_manager.on_connection_state_change = value @property def on_new_sastoken_required(self) -> FunctionOrCoroutine[[None], None]: @@ -479,11 +486,13 @@ def on_new_sastoken_required(self) -> FunctionOrCoroutine[[None], None]: The function or coroutine definition should take no positional arguments. """ - return self._handler_manager.on_new_sastoken_required + if self._handler_manager is not None: + return self._handler_manager.on_new_sastoken_required @on_new_sastoken_required.setter def on_new_sastoken_required(self, value: FunctionOrCoroutine[[None], None]) -> None: - self._handler_manager.on_new_sastoken_required = value + if self._handler_manager is not None: + self._handler_manager.on_new_sastoken_required = value @property def on_background_exception(self) -> FunctionOrCoroutine[[Exception], None]: @@ -491,11 +500,13 @@ def on_background_exception(self) -> FunctionOrCoroutine[[Exception], None]: The function or coroutine definition should take one positional argument (the exception object)""" - return self._handler_manager.on_background_exception + if self._handler_manager is not None: + return self._handler_manager.on_background_exception @on_background_exception.setter def on_background_exception(self, value: FunctionOrCoroutine[[Exception], None]) -> None: - self._handler_manager.on_background_exception = value + if self._handler_manager is not None: + self._handler_manager.on_background_exception = value @abc.abstractproperty def on_message_received(self) -> FunctionOrCoroutine[[Message], None]: @@ -511,7 +522,8 @@ def on_method_request_received(self) -> FunctionOrCoroutine[[MethodRequest], Non The function or coroutine definition should take one positional argument (the :class:`azure.iot.device.MethodRequest` object)""" - return self._handler_manager.on_method_request_received + if self._handler_manager is not None: + return self._handler_manager.on_method_request_received @on_method_request_received.setter def on_method_request_received(self, value: FunctionOrCoroutine[[MethodRequest], None]) -> None: @@ -526,7 +538,8 @@ def on_twin_desired_properties_patch_received(self) -> FunctionOrCoroutine[[Twin The function or coroutine definition should take one positional argument (the twin patch in the form of a JSON dictionary object)""" - return self._handler_manager.on_twin_desired_properties_patch_received + if self._handler_manager is not None: + return self._handler_manager.on_twin_desired_properties_patch_received @on_twin_desired_properties_patch_received.setter def on_twin_desired_properties_patch_received( @@ -695,7 +708,8 @@ def on_message_received(self) -> FunctionOrCoroutine[[Message], None]: The function or coroutine definition should take one positional argument (the :class:`azure.iot.device.Message` object)""" - return self._handler_manager.on_message_received + if self._handler_manager is not None: + return self._handler_manager.on_message_received @on_message_received.setter def on_message_received(self, value: FunctionOrCoroutine[[Message], None]): @@ -813,11 +827,11 @@ def create_from_edge_environment(cls, **kwargs) -> Self: try: sastoken = st.RenewableSasToken(uri, signing_mechanism, ttl=token_ttl) except st.SasTokenError as e: - new_err = ValueError( + new_val_err = ValueError( "Could not create a SasToken using the values provided, or in the Edge environment" ) - new_err.__cause__ = e - raise new_err + new_val_err.__cause__ = e + raise new_val_err # Pipeline Config setup config_kwargs = _get_config_kwargs(**kwargs) @@ -925,7 +939,8 @@ def on_message_received(self) -> FunctionOrCoroutine[[Message], Any]: The function definition or coroutine should take one positional argument (the :class:`azure.iot.device.Message` object)""" - return self._handler_manager.on_message_received + if self._handler_manager is not None: + return self._handler_manager.on_message_received @on_message_received.setter def on_message_received(self, value: FunctionOrCoroutine[[Message], Any]) -> None: diff --git a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py index 30e7ad0b3..6eaca9c1a 100644 --- a/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/aio/async_clients.py @@ -618,7 +618,7 @@ def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message - async def send_message_to_output(self, message: Message, output_name: str) -> None: + async def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None: """Sends an event/message to the given module output. These are outgoing events and are meant to be "output events" diff --git a/azure-iot-device/azure/iot/device/iothub/edge_hsm.py b/azure-iot-device/azure/iot/device/iothub/edge_hsm.py index a443994f7..a82d273d6 100644 --- a/azure-iot-device/azure/iot/device/iothub/edge_hsm.py +++ b/azure-iot-device/azure/iot/device/iothub/edge_hsm.py @@ -7,7 +7,7 @@ import logging import json import base64 -import requests +import requests # type: ignore import requests_unixsocket import urllib from azure.iot.device.common.auth.signing_mechanism import SigningMechanism diff --git a/azure-iot-device/azure/iot/device/iothub/sync_clients.py b/azure-iot-device/azure/iot/device/iothub/sync_clients.py index 0291915de..89690f39b 100644 --- a/azure-iot-device/azure/iot/device/iothub/sync_clients.py +++ b/azure-iot-device/azure/iot/device/iothub/sync_clients.py @@ -8,6 +8,7 @@ """ from __future__ import annotations # Needed for annotation bug < 3.10 import logging +from queue import Queue import deprecation from .abstract_clients import ( AbstractIoTHubClient, @@ -25,7 +26,7 @@ from azure.iot.device import constant as device_constant from .pipeline import MQTTPipeline, HTTPPipeline from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch -from typing import Optional, Union +from typing import Any, Dict, List, Optional, Set, Union logger = logging.getLogger(__name__) @@ -184,7 +185,8 @@ def shutdown(self) -> None: logger.debug("Completed pipeline shutdown operation") # Stop the Client Event handlers now that everything else is completed - self._handler_manager.stop(receiver_handlers_only=False) + if self._handler_manager is not None: + self._handler_manager.stop(receiver_handlers_only=False) # Yes, that means the pipeline is disconnected twice (well, actually three times if you # consider that the client-level disconnect causes two pipeline-level disconnects for @@ -251,7 +253,8 @@ def disconnect(self) -> None: # Note that in the process of stopping the handlers and resolving pending calls # a user-supplied handler may cause a reconnection to occur logger.debug("Stopping handlers...") - self._handler_manager.stop(receiver_handlers_only=True) + if self._handler_manager is not None: + self._handler_manager.stop(receiver_handlers_only=True) logger.debug("Successfully stopped handlers") # Disconnect again to ensure disconnection has occurred due to the issue mentioned above @@ -358,7 +361,7 @@ def send_message(self, message: Union[Message, str]) -> None: ) def receive_method_request( self, method_name: Optional[str] = None, block: bool = True, timeout: Optional[int] = None - ) -> MethodRequest: + ) -> Optional[MethodRequest]: """Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub. :param str method_name: Optionally provide the name of the method to receive requests for. @@ -375,7 +378,8 @@ def receive_method_request( if not self._mqtt_pipeline.feature_enabled[pipeline_constant.METHODS]: self._enable_feature(pipeline_constant.METHODS) - method_inbox = self._inbox_manager.get_method_request_inbox(method_name) + if self._inbox_manager is not None: + method_inbox : Queue[MethodRequest] = self._inbox_manager.get_method_request_inbox(method_name) logger.info("Waiting for method request...") try: @@ -519,7 +523,8 @@ def receive_twin_desired_properties_patch(self, block=True, timeout=None) -> Twi if not self._mqtt_pipeline.feature_enabled[pipeline_constant.TWIN_PATCHES]: self._enable_feature(pipeline_constant.TWIN_PATCHES) - twin_patch_inbox = self._inbox_manager.get_twin_patch_inbox() + if self._inbox_manager is not None: + twin_patch_inbox : Queue[TwinPatch] = self._inbox_manager.get_twin_patch_inbox() logger.info("Waiting for twin patches...") try: @@ -544,14 +549,15 @@ def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): :type mqtt_pipeline: :class:`azure.iot.device.iothub.pipeline.MQTTPipeline` """ super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) - self._mqtt_pipeline.on_c2d_message_received = self._inbox_manager.route_c2d_message + if self._inbox_manager is not None: + self._mqtt_pipeline.on_c2d_message_received = self._inbox_manager.route_c2d_message @deprecation.deprecated( deprecated_in="2.3.0", current_version=device_constant.VERSION, details="We recommend that you use the .on_message_received property to set a handler instead", ) - def receive_message(self, block=True, timeout=None) -> Message: + def receive_message(self, block=True, timeout=None) -> Optional[Message]: """Receive a message that has been sent from the Azure IoT Hub. :param bool block: Indicates if the operation should block until a message is received. @@ -565,7 +571,8 @@ def receive_message(self, block=True, timeout=None) -> Message: if not self._mqtt_pipeline.feature_enabled[pipeline_constant.C2D_MSG]: self._enable_feature(pipeline_constant.C2D_MSG) - c2d_inbox = self._inbox_manager.get_c2d_message_inbox() + if self._inbox_manager is not None: + c2d_inbox : Queue[Message] = self._inbox_manager.get_c2d_message_inbox() logger.info("Waiting for message from Hub...") try: @@ -626,9 +633,10 @@ def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline): :type http_pipeline: :class:`azure.iot.device.iothub.pipeline.HTTPPipeline` """ super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline) - self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message + if self._inbox_manager is not None: + self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message - def send_message_to_output(self, message: Message, output_name: str) -> None: + def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None: """Sends an event/message to the given module output. These are outgoing events and are meant to be "output events". @@ -681,7 +689,7 @@ def send_message_to_output(self, message: Message, output_name: str) -> None: ) def receive_message_on_input( self, input_name: str, block: bool = True, timeout: Optional[int] = None - ) -> Message: + ) -> Optional[Message]: """Receive an input message that has been sent from another Module to a specific input. :param str input_name: The input name to receive a message on. @@ -695,7 +703,8 @@ def receive_message_on_input( if not self._mqtt_pipeline.feature_enabled[pipeline_constant.INPUT_MSG]: self._enable_feature(pipeline_constant.INPUT_MSG) - input_inbox = self._inbox_manager.get_input_message_inbox(input_name) + if self._inbox_manager is not None: + input_inbox : Queue[Message] = self._inbox_manager.get_input_message_inbox(input_name) logger.info("Waiting for input message on: " + input_name + "...") try: diff --git a/azure-iot-device/azure/iot/device/patch.py b/azure-iot-device/azure/iot/device/patch.py index 5d4bec270..9552338ec 100644 --- a/azure-iot-device/azure/iot/device/patch.py +++ b/azure-iot-device/azure/iot/device/patch.py @@ -7,12 +7,13 @@ import inspect import logging +from typing import Dict logger = logging.getLogger(__name__) # This dict will be used as a scope for imports and defs in add_shims_for_inherited_methods # in order to keep them out of the global scope of this module. -shim_scope = {} +shim_scope : Dict[str, str]= {} def add_shims_for_inherited_methods(target_class): diff --git a/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py index 85cbda79d..fb7ab0811 100644 --- a/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/abstract_provisioning_device_client.py @@ -39,7 +39,7 @@ def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs): ] for kwarg in kwargs: - if (kwarg not in valid_kwargs) or (kwarg in exclude): + if (kwarg not in valid_kwargs) or (exclude is not None and kwarg in exclude): raise TypeError("Unsupported keyword argument '{}'".format(kwarg)) @@ -251,7 +251,7 @@ def provisioning_payload(self, provisioning_payload: ProvisioningPayload): self._provisioning_payload = provisioning_payload -def log_on_register_complete(result: Union[RegistrationResult, Exception] = None) -> None: +def log_on_register_complete(result: Optional[RegistrationResult] = None) -> None: # This could be a failed/successful registration result from DPS # or a error from polling machine. Response should be given appropriately if result is not None: diff --git a/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py b/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py index c99b4f9b4..672089284 100644 --- a/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py +++ b/azure-iot-device/azure/iot/device/provisioning/pipeline/exceptions.py @@ -8,7 +8,14 @@ # For now, present relevant transport errors as part of the Pipeline API surface # so that they do not have to be duplicated at this layer. # OK TODO This mimics the IotHub Case. Both IotHub and Provisioning needs to change -from azure.iot.device.common.pipeline.pipeline_exceptions import * # noqa: F401, F403 +from azure.iot.device.common.pipeline.pipeline_exceptions import ( + PipelineException, + OperationCancelled, + OperationTimeout, + OperationError, + PipelineNotRunning, + PipelineRuntimeError +) # noqa: F401, F403 from azure.iot.device.common.transport_exceptions import ( # noqa: F401 ConnectionFailedError, ConnectionDroppedError, diff --git a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py index 5bf63755c..3b4a94c3b 100644 --- a/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py +++ b/azure-iot-device/azure/iot/device/provisioning/provisioning_device_client.py @@ -10,7 +10,7 @@ """ from __future__ import annotations # Needed for annotation bug < 3.10 import logging -from typing import Any +from typing import Any, Union from azure.iot.device.common.evented_callback import EventedCallback from azure.iot.device.custom_typing import FunctionOrCoroutine from .abstract_provisioning_device_client import AbstractProvisioningDeviceClient @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) -def handle_result(callback: FunctionOrCoroutine[[Any], None]) -> None: +def handle_result(callback: FunctionOrCoroutine[[Any], None]) -> RegistrationResult: try: return callback.wait_for_completion() except pipeline_exceptions.ConnectionDroppedError as e: diff --git a/samples/async-edge-scenarios/send_message_downstream.py b/samples/async-edge-scenarios/send_message_downstream.py index 66a0c26dc..3620e170d 100644 --- a/samples/async-edge-scenarios/send_message_downstream.py +++ b/samples/async-edge-scenarios/send_message_downstream.py @@ -27,7 +27,7 @@ async def main(): # The client object is used to interact with your Azure IoT Edge device. device_client = IoTHubDeviceClient.create_from_connection_string( - connection_string=conn_str, server_verification_cert=root_ca_cert + connection_string_dict=conn_str, server_verification_cert=root_ca_cert ) # Connect the client.