Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliva Kar committed Jan 25, 2024
1 parent ca28940 commit 2e41249
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 77 deletions.
2 changes: 1 addition & 1 deletion azure-iot-device/azure/iot/device/common/http_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import logging
import ssl
import requests
import requests # type: ignore
from . import transport_exceptions as exceptions
from .pipeline import pipeline_thread

Expand Down
123 changes: 69 additions & 54 deletions azure-iot-device/azure/iot/device/iothub/abstract_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from azure.iot.device.common.auth import connection_string as cs
from azure.iot.device.common.auth import sastoken as st
from azure.iot.device.iothub import client_event
from azure.iot.device.iothub.models import Message, MethodRequest
from azure.iot.device.iothub.models import Message, MethodRequest, MethodResponse
from azure.iot.device.common.models import X509
from azure.iot.device import exceptions
from azure.iot.device.common import auth, handle_exceptions
Expand Down Expand Up @@ -49,7 +49,7 @@ def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs) -> None:
]

for kwarg in kwargs:
if (kwarg not in valid_kwargs) or (kwarg in exclude):
if (kwarg not in valid_kwargs) or (exclude is not None and kwarg in exclude):
raise TypeError("Unsupported keyword argument: '{}'".format(kwarg))


Expand Down Expand Up @@ -99,7 +99,7 @@ def _extract_sas_uri_values(uri: str) -> Dict[str, Any]:
try:
d["module_id"] = items[4]
except IndexError:
d["module_id"] = None
d["module_id"] = ""
return d


Expand Down Expand Up @@ -131,44 +131,48 @@ def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline) ->
def _on_connected(self) -> None:
"""Helper handler that is called upon an iothub pipeline connect"""
logger.info("Connection State - Connected")
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE)
client_event_inbox.put(event)
# Ensure that all handlers are running now that connection is re-established.
self._handler_manager.ensure_running()
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE)
client_event_inbox.put(event)
# Ensure that all handlers are running now that connection is re-established.
self._handler_manager.ensure_running()

def _on_disconnected(self) -> None:
"""Helper handler that is called upon an iothub pipeline disconnect"""
logger.info("Connection State - Disconnected")
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE)
client_event_inbox.put(event)
# Locally stored method requests on client are cleared.
# They will be resent by IoTHub on reconnect.
self._inbox_manager.clear_all_method_requests()
logger.info("Cleared all pending method requests due to disconnect")
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.CONNECTION_STATE_CHANGE)
client_event_inbox.put(event)
# Locally stored method requests on client are cleared.
# They will be resent by IoTHub on reconnect.
self._inbox_manager.clear_all_method_requests()
logger.info("Cleared all pending method requests due to disconnect")

def _on_new_sastoken_required(self) -> None:
"""Helper handler that is called upon the iothub pipeline needing new SAS token"""
logger.info("New SasToken required from user")
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED)
client_event_inbox.put(event)
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED)
client_event_inbox.put(event)

def _on_background_exception(self, e: Exception) -> None:
"""Helper handler that is called upon an iothub pipeline background exception"""
handle_exceptions.handle_background_exception(e)
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e)
client_event_inbox.put(event)
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e)
client_event_inbox.put(event)

def _check_receive_mode_is_api(self) -> None:
"""Call this function first in EVERY receive API"""
Expand All @@ -190,7 +194,8 @@ def _check_receive_mode_is_handler(self) -> None:
# Lock the client to ONLY use receive handlers (no APIs)
self._receive_type = RECEIVE_TYPE_HANDLER
# Set the inbox manager to use unified msg receives
self._inbox_manager.use_unified_msg_mode = True
if self._inbox_manager is not None:
self._inbox_manager.use_unified_msg_mode = True
elif self._receive_type is RECEIVE_TYPE_API:
raise exceptions.ClientError(
"Cannot set receive handlers - receive APIs have already been used"
Expand Down Expand Up @@ -292,18 +297,18 @@ def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self
_validate_kwargs(exclude=excluded_kwargs, **kwargs)

# Create SasToken
connection_string = cs.ConnectionString(connection_string)
if connection_string.get(cs.X509) is not None:
connection_string_dict = cs.ConnectionString(connection_string)
if connection_string_dict.get(cs.X509) is not None:
raise ValueError(
"Use the .create_from_x509_certificate() method instead when using X509 certificates"
)
uri = _form_sas_uri(
hostname=connection_string[cs.HOST_NAME],
device_id=connection_string[cs.DEVICE_ID],
module_id=connection_string.get(cs.MODULE_ID),
hostname=connection_string_dict[cs.HOST_NAME],
device_id=connection_string_dict[cs.DEVICE_ID],
module_id=connection_string_dict.get(cs.MODULE_ID),
)
signing_mechanism = auth.SymmetricKeySigningMechanism(
key=connection_string[cs.SHARED_ACCESS_KEY]
key=connection_string_dict[cs.SHARED_ACCESS_KEY]
)
token_ttl = kwargs.get("sastoken_ttl", 3600)
try:
Expand All @@ -315,10 +320,10 @@ def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self
# Pipeline Config setup
config_kwargs = _get_config_kwargs(**kwargs)
pipeline_configuration = pipeline.IoTHubPipelineConfig(
device_id=connection_string[cs.DEVICE_ID],
module_id=connection_string.get(cs.MODULE_ID),
hostname=connection_string[cs.HOST_NAME],
gateway_hostname=connection_string.get(cs.GATEWAY_HOST_NAME),
device_id=connection_string_dict[cs.DEVICE_ID],
module_id=connection_string_dict.get(cs.MODULE_ID),
hostname=connection_string_dict[cs.HOST_NAME],
gateway_hostname=connection_string_dict.get(cs.GATEWAY_HOST_NAME),
sastoken=sastoken,
**config_kwargs,
)
Expand Down Expand Up @@ -429,7 +434,7 @@ def receive_method_request(self, method_name: Optional[str] = None) -> None:

@abc.abstractmethod
def send_method_response(
self, method_request: MethodRequest, payload: Dict[str, JSONSerializable], status: int
self, method_response: MethodResponse
) -> None:
pass

Expand Down Expand Up @@ -458,11 +463,13 @@ def on_connection_state_change(self) -> FunctionOrCoroutine[[None], None]:
The function or coroutine definition should take no positional arguments.
"""
return self._handler_manager.on_connection_state_change
if self._handler_manager is not None:
return self._handler_manager.on_connection_state_change

@on_connection_state_change.setter
def on_connection_state_change(self, value: FunctionOrCoroutine[[None], None]) -> None:
self._handler_manager.on_connection_state_change = value
if self._handler_manager is not None:
self._handler_manager.on_connection_state_change = value

@property
def on_new_sastoken_required(self) -> FunctionOrCoroutine[[None], None]:
Expand All @@ -479,23 +486,27 @@ def on_new_sastoken_required(self) -> FunctionOrCoroutine[[None], None]:
The function or coroutine definition should take no positional arguments.
"""
return self._handler_manager.on_new_sastoken_required
if self._handler_manager is not None:
return self._handler_manager.on_new_sastoken_required

@on_new_sastoken_required.setter
def on_new_sastoken_required(self, value: FunctionOrCoroutine[[None], None]) -> None:
self._handler_manager.on_new_sastoken_required = value
if self._handler_manager is not None:
self._handler_manager.on_new_sastoken_required = value

@property
def on_background_exception(self) -> FunctionOrCoroutine[[Exception], None]:
"""The handler function or coroutine will be called when a background exception occurs.
The function or coroutine definition should take one positional argument (the exception
object)"""
return self._handler_manager.on_background_exception
if self._handler_manager is not None:
return self._handler_manager.on_background_exception

@on_background_exception.setter
def on_background_exception(self, value: FunctionOrCoroutine[[Exception], None]) -> None:
self._handler_manager.on_background_exception = value
if self._handler_manager is not None:
self._handler_manager.on_background_exception = value

@abc.abstractproperty
def on_message_received(self) -> FunctionOrCoroutine[[Message], None]:
Expand All @@ -511,7 +522,8 @@ def on_method_request_received(self) -> FunctionOrCoroutine[[MethodRequest], Non
The function or coroutine definition should take one positional argument (the
:class:`azure.iot.device.MethodRequest` object)"""
return self._handler_manager.on_method_request_received
if self._handler_manager is not None:
return self._handler_manager.on_method_request_received

@on_method_request_received.setter
def on_method_request_received(self, value: FunctionOrCoroutine[[MethodRequest], None]) -> None:
Expand All @@ -526,7 +538,8 @@ def on_twin_desired_properties_patch_received(self) -> FunctionOrCoroutine[[Twin
The function or coroutine definition should take one positional argument (the twin patch
in the form of a JSON dictionary object)"""
return self._handler_manager.on_twin_desired_properties_patch_received
if self._handler_manager is not None:
return self._handler_manager.on_twin_desired_properties_patch_received

@on_twin_desired_properties_patch_received.setter
def on_twin_desired_properties_patch_received(
Expand Down Expand Up @@ -695,7 +708,8 @@ def on_message_received(self) -> FunctionOrCoroutine[[Message], None]:
The function or coroutine definition should take one positional argument (the
:class:`azure.iot.device.Message` object)"""
return self._handler_manager.on_message_received
if self._handler_manager is not None:
return self._handler_manager.on_message_received

@on_message_received.setter
def on_message_received(self, value: FunctionOrCoroutine[[Message], None]):
Expand Down Expand Up @@ -813,11 +827,11 @@ def create_from_edge_environment(cls, **kwargs) -> Self:
try:
sastoken = st.RenewableSasToken(uri, signing_mechanism, ttl=token_ttl)
except st.SasTokenError as e:
new_err = ValueError(
new_val_err = ValueError(
"Could not create a SasToken using the values provided, or in the Edge environment"
)
new_err.__cause__ = e
raise new_err
new_val_err.__cause__ = e
raise new_val_err

# Pipeline Config setup
config_kwargs = _get_config_kwargs(**kwargs)
Expand Down Expand Up @@ -925,7 +939,8 @@ def on_message_received(self) -> FunctionOrCoroutine[[Message], Any]:
The function definition or coroutine should take one positional argument (the
:class:`azure.iot.device.Message` object)"""
return self._handler_manager.on_message_received
if self._handler_manager is not None:
return self._handler_manager.on_message_received

@on_message_received.setter
def on_message_received(self, value: FunctionOrCoroutine[[Message], Any]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline):
super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline)
self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message

async def send_message_to_output(self, message: Message, output_name: str) -> None:
async def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None:
"""Sends an event/message to the given module output.
These are outgoing events and are meant to be "output events"
Expand Down
2 changes: 1 addition & 1 deletion azure-iot-device/azure/iot/device/iothub/edge_hsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import json
import base64
import requests
import requests # type: ignore
import requests_unixsocket
import urllib
from azure.iot.device.common.auth.signing_mechanism import SigningMechanism
Expand Down
Loading

0 comments on commit 2e41249

Please sign in to comment.