diff --git a/run/conf_testing/rules/test_mqtt.py b/run/conf_testing/rules/test_mqtt.py index 94cd346b..ee1f78db 100644 --- a/run/conf_testing/rules/test_mqtt.py +++ b/run/conf_testing/rules/test_mqtt.py @@ -2,8 +2,8 @@ import time import HABApp +from HABApp.core.connections import Connections, ConnectionStatus from HABApp.core.events import ValueUpdateEventFilter -from HABApp.mqtt.connection.handler import CONNECTION_HANDLER from HABApp.mqtt.events import MqttValueUpdateEventFilter from HABApp.mqtt.items import MqttItem, MqttPairItem from HABAppTests import EventWaiter, ItemWaiter, TestBaseRule, run_coro @@ -63,21 +63,28 @@ def test_mqtt_item_creation(self): topic = 'mqtt/item/creation' assert HABApp.core.Items.item_exists(topic) is False - assert self.mqtt.publish(topic, 'asdf') + self.mqtt.publish(topic, 'asdf') time.sleep(0.1) assert HABApp.core.Items.item_exists(topic) is False # We create the item only on retain - assert self.mqtt.publish(topic, 'asdf', retain=True) + self.mqtt.publish(topic, 'asdf', retain=True) + time.sleep(0.2) + + run_coro(self.trigger_reconnect()) - # We need to reconnect to receive the message - connection = CONNECTION_HANDLER.plugin_connection - run_coro(CONNECTION_HANDLER.on_disconnected(connection, connection.context)) - run_coro(CONNECTION_HANDLER.on_connecting(connection, connection.context)) time.sleep(0.2) + connection = Connections.get('mqtt') + while not connection.is_online: + time.sleep(0.2) assert HABApp.core.Items.item_exists(topic) is True HABApp.core.Items.pop_item(topic) + async def trigger_reconnect(self): + connection = Connections.get('mqtt') + connection.status._set_manual(ConnectionStatus.DISCONNECTED) + connection.advance_status_task.start_if_not_running() + TestMQTTEvents() diff --git a/src/HABApp/mqtt/connection/connection.py b/src/HABApp/mqtt/connection/connection.py index 44415ae9..74577778 100644 --- a/src/HABApp/mqtt/connection/connection.py +++ b/src/HABApp/mqtt/connection/connection.py @@ -8,6 +8,7 @@ import HABApp from HABApp.core.asyncio import AsyncContext from HABApp.core.connections import BaseConnection, Connections, ConnectionStateToEventBusPlugin, AutoReconnectPlugin +from HABApp.core.connections.base_connection import AlreadyHandledException from HABApp.core.connections.base_plugin import BaseConnectionPluginConnectedTask from HABApp.core.const.const import PYTHON_310 @@ -67,5 +68,7 @@ async def _mqtt_wrap_task(self): try: with AsyncContext('MQTT'), connection.handle_exception(self.mqtt_task): await self.mqtt_task() + except AlreadyHandledException: + pass finally: log.debug(f'{self.task.name} task stop') diff --git a/src/HABApp/mqtt/connection/publish.py b/src/HABApp/mqtt/connection/publish.py index a9a02907..00a47eaa 100644 --- a/src/HABApp/mqtt/connection/publish.py +++ b/src/HABApp/mqtt/connection/publish.py @@ -87,4 +87,4 @@ def publish(topic: Union[str, ItemRegistryItem], payload, qos: Optional[QOS] = N :param qos: QoS, can be 0, 1 or 2. If not specified the value from configuration file will be used. :param retain: retain message. If not specified the value from configuration file will be used. """ - run_func_from_async(topic, payload, qos, retain) + run_func_from_async(async_publish, topic, payload, qos, retain) diff --git a/src/HABApp/mqtt/connection/subscribe.py b/src/HABApp/mqtt/connection/subscribe.py index 3356657f..338be40a 100644 --- a/src/HABApp/mqtt/connection/subscribe.py +++ b/src/HABApp/mqtt/connection/subscribe.py @@ -128,6 +128,12 @@ async def apply_subscriptions(self): async def on_connected(self): await super().on_connected() + + # Since we are freshly connected we have not yet subscribed to anything + # We need to clear this here because in case of error it might still have the topics + # from the last successful subscription in this dict + self.subscribed_to.clear() + self.sub_task.start_if_not_running() await self.sub_task.wait()