Skip to content

Commit

Permalink
repeat subscribe when reconnected to MQTT broker (#337)
Browse files Browse the repository at this point in the history
* repeat subscribe when reconnected to MQTT broker; added comments

* Resolved pylint issues in server.py

* Resolved pylint issues in asyncio_mqtt.py

* Outcommented "keepalive" option in asyncio_mqtt.py

---------

Co-authored-by: Johannes.Hennecke <[email protected]>
  • Loading branch information
JohannesHennecke and Johannes.Hennecke authored Jan 29, 2024
1 parent 390cc52 commit 44e94c0
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 38 deletions.
12 changes: 12 additions & 0 deletions mqtt_io/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ class StreamDataSentEvent(Event):
stream_name: str
data: bytes

@dataclass
class StreamDataSubscribeEvent(Event):
"""
Trigger MQTT subscribe
"""

@dataclass
class DigitalSubscribeEvent(Event):
"""
Trigger MQTT subscribe
"""


class EventBus:
"""
Expand Down
90 changes: 89 additions & 1 deletion mqtt_io/mqtt/asyncio_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,27 @@


def _map_exception(func: Func) -> Func:
"""
Creates a decorator that wraps a function and maps any raised `MqttError`
exception to a `MQTTException`.
:param func: The function to be wrapped.
:type func: Func
:return: The wrapped function.
:rtype: Func
"""
@wraps(func)
async def inner(*args: Any, **kwargs: Any) -> Any:
"""
Decorator for asynchronous functions that catches `MqttError` exceptions
and raises `MQTTException` instead.
Parameters:
func (Callable): The function to be decorated.
Returns:
Callable: The decorated function.
"""
try:
await func(*args, **kwargs)
except MqttError as exc:
Expand All @@ -42,6 +61,15 @@ class MQTTClient(AbstractMQTTClient):
"""

def __init__(self, options: MQTTClientOptions):
"""
Initializes a new instance of the MQTTClient class.
Args:
options (MQTTClientOptions): The options for the MQTT client.
Returns:
None
"""
super().__init__(options)
protocol_map = {
MQTTProtocol.V31: paho.MQTTv31,
Expand All @@ -66,7 +94,7 @@ def __init__(self, options: MQTTClientOptions):
username=options.username,
password=options.password,
client_id=options.client_id,
# keepalive=options.keepalive, # This isn't implemented yet on 0.8.1
#keepalive=options.keepalive,
tls_context=tls_context,
protocol=protocol_map[options.protocol],
will=will,
Expand All @@ -76,28 +104,82 @@ def __init__(self, options: MQTTClientOptions):

@_map_exception
async def connect(self, timeout: int = 10) -> None:
"""
Connects to the client asynchronously.
Args:
timeout (int): The timeout value in seconds (default: 10).
Returns:
None: This function does not return anything.
"""
await self._client.connect(timeout=timeout)

@_map_exception
async def disconnect(self) -> None:
"""
This function is an asynchronous method that handles the disconnection of the client.
Parameters:
self: The current instance of the class.
Returns:
None
"""
try:
await self._client.disconnect()
except TimeoutError:
await self._client.force_disconnect()

@_map_exception
async def subscribe(self, topics: List[Tuple[str, int]]) -> None:
"""
Subscribe to the given list of topics.
Args:
topics (List[Tuple[str, int]]): A list of tuples representing the topics
to subscribe to.
Each tuple should contain a string representing the topic name and
an integer representing the QoS level.
Returns:
None: This function does not return anything.
Raises:
Exception: If there is an error while subscribing to the topics.
"""
await self._client.subscribe(topics)

@_map_exception
async def publish(self, msg: MQTTMessageSend) -> None:
"""
Publishes an MQTT message to the specified topic.
Args:
msg (MQTTMessageSend): The MQTT message to be published.
Returns:
None: This function does not return anything.
"""
await self._client.publish(
topic=msg.topic, payload=msg.payload, qos=msg.qos, retain=msg.retain
)

def _on_message(
self, client: paho.Client, userdata: Any, msg: paho.MQTTMessage
) -> None:
"""
Callback function that is called when a message is received through MQTT.
Args:
client (paho.Client): The MQTT client instance.
userdata (Any): The user data associated with the client.
msg (paho.MQTTMessage): The received MQTT message.
Returns:
None: This function does not return anything.
"""
if self._message_queue is None:
_LOG.warning("Discarding MQTT message because queue is not initialised")
return
Expand All @@ -111,6 +193,12 @@ def _on_message(

@property
def message_queue(self) -> "asyncio.Queue[MQTTMessage]":
"""
Returns the message queue for receiving MQTT messages.
:return: The message queue for receiving MQTT messages.
:rtype: asyncio.Queue[MQTTMessage]
"""
if self._message_queue is None:
self._message_queue = asyncio.Queue(self._options.message_queue_size)
# pylint: disable=protected-access
Expand Down
Loading

0 comments on commit 44e94c0

Please sign in to comment.