From fd23b0c136709df7bc0e5d204158ab6149c581a8 Mon Sep 17 00:00:00 2001 From: "Clifford W. Hansen" Date: Fri, 31 May 2024 14:51:44 +0200 Subject: [PATCH] Some refactoring and bringing in line a couple of things for 2024.5 --- .gitignore | 1 + custom_components/miio_gateway/__init__.py | 364 ++++++++---------- .../miio_gateway/alarm_control_panel.py | 137 ++++--- .../miio_gateway/binary_sensor.py | 35 +- custom_components/miio_gateway/light.py | 68 +++- .../miio_gateway/media_player.py | 96 +++-- custom_components/miio_gateway/sensor.py | 85 ++-- 7 files changed, 421 insertions(+), 365 deletions(-) diff --git a/.gitignore b/.gitignore index bee8a64..43233e4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ __pycache__ +.idea/ diff --git a/custom_components/miio_gateway/__init__.py b/custom_components/miio_gateway/__init__.py index c8f5ff7..d73c732 100644 --- a/custom_components/miio_gateway/__init__.py +++ b/custom_components/miio_gateway/__init__.py @@ -1,24 +1,24 @@ +import asyncio import json import logging import socket -from time import sleep -from threading import Thread -from multiprocessing import Queue from datetime import timedelta +from multiprocessing import Queue +from threading import Thread +import homeassistant.helpers.config_validation as cv import voluptuous as vol - from homeassistant.const import ( - CONF_HOST, CONF_MAC, CONF_PORT, + CONF_HOST, CONF_PORT, EVENT_HOMEASSISTANT_STOP) - -from homeassistant.core import callback +from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import discovery -import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.helpers.event import async_track_time_interval +from homeassistant.helpers.restore_state import RestoreEntity from homeassistant.util.dt import utcnow +INTERNAL_PING = "internal.PING" + _LOGGER = logging.getLogger(__name__) TIME_INTERVAL_PING = timedelta(minutes=1) @@ -26,8 +26,6 @@ DOMAIN = "miio_gateway" CONF_DATA_DOMAIN = "miio_gateway_config" -CONF_HOST = "host" -CONF_PORT = "port" CONF_SENSORS = "sensors" CONF_SENSOR_SID = "sid" CONF_SENSOR_CLASS = "class" @@ -62,15 +60,16 @@ SERVICE_JOIN_ZIGBEE = "join_zigbee" SERVICE_SCHEMA = vol.Schema({}) -def setup(hass, config): - """Setup gateway from config.""" + +async def async_setup(hass: HomeAssistant, config: dict) -> bool: + """Set up the gateway from config.""" _LOGGER.info("Starting gateway setup...") - # Gateway starts it's action on object init. + # Gateway starts its action on object init. gateway = XiaomiGw(hass, config[DOMAIN][CONF_HOST], config[DOMAIN][CONF_PORT]) # Gentle stop on HASS stop. - hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, gateway.gently_stop) + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, gateway.gently_stop) # Share the config to platform's components. hass.data[DOMAIN] = gateway @@ -78,57 +77,101 @@ def setup(hass, config): # Load components. for component in ["light", "media_player", "binary_sensor", "sensor", "alarm_control_panel"]: - discovery.load_platform(hass, component, DOMAIN, {}, config) + hass.async_create_task( + discovery.async_load_platform(hass, component, DOMAIN, {}, config) + ) # Zigbee join HASS service helper. - def join_zigbee_service_handler(service): + async def join_zigbee_service_handler(service): gateway = hass.data[DOMAIN] - gateway.send_to_hub({ "method": "start_zigbee_join" }) - hass.services.register( + await gateway.send_to_hub({"method": "start_zigbee_join"}) + + hass.services.async_register( DOMAIN, SERVICE_JOIN_ZIGBEE, join_zigbee_service_handler, - schema=SERVICE_SCHEMA) + schema=SERVICE_SCHEMA + ) return True + +def _process_result(result): + if isinstance(result, list): + if len(result) == 0: + return "unknown" + return result[0] + return result + + +def _process_params(params): + if isinstance(params, list): + if len(params) == 0: + return {} + params = params[0] + if not isinstance(params, dict): + return {"data": params} + return params + + +def _is_ignored_method(method: str) -> bool: + return method.startswith("internal.") or method in ["_sync.neighborDevInfo"] + + +def _determine_event(method: str): + if method.startswith("event."): + return method + elif method == "_otc.log": + return EVENT_METADATA + elif method == "props": + return EVENT_VALUES + return None + + +def _miio_msg_decode(data: bytes) -> list: + """Decode data received from gateway.""" + if data[-1] == 0: + data = data[:-1] + resps = [] + try: + data_arr = "[" + data.decode().replace("}{", "},{") + "]" + resps = json.loads(data_arr) + except json.JSONDecodeError: + _LOGGER.warning("Bad JSON received: %s", str(data)) + return resps + + class XiaomiGw: """Gateway socket and communication layer.""" - def __init__(self, hass, host, port): + def __init__(self, hass: HomeAssistant, host: str, port: int): self.hass = hass - self._host = host self._port = port - self._socket = None self._thread = None self._thread_alive = True - self._send_queue = Queue(maxsize=25) self._miio_id = 0 - self._callbacks = [] self._result_callbacks = {} - self._available = None self._availability_pinger = None self._pings_sent = 0 - self._known_sids = [] - self._known_sids.append("miio.gateway") # Append self. + self._known_sids.append("miio.gateway") # Append self. import hashlib, base64 - self._unique_id = base64.urlsafe_b64encode(hashlib.sha1((self._host + ":" + str(self._port)).encode("utf-8")).digest())[:10].decode("utf-8") + self._unique_id = base64.urlsafe_b64encode( + hashlib.sha1((self._host + ":" + str(self._port)).encode("utf-8")).digest() + )[:10].decode("utf-8") self._create_socket() self._init_listener() - """Public.""" - def unique_id(self) -> str: """Return a unique ID.""" return self._unique_id - def is_available(self): + def is_available(self) -> bool: """Return availability state.""" return self._available @@ -137,22 +180,22 @@ def gently_stop(self, event=None): self._stop_listening() self._close_socket() - def send_to_hub(self, data, callback=None): + async def send_to_hub(self, data: dict, callback=None): """Send data to hub.""" miio_id, data = self._miio_msg_encode(data) if callback is not None: - _LOGGER.info("Adding callback for call ID: " + str(miio_id)) + _LOGGER.info("Adding callback for call ID: %s", str(miio_id)) self._result_callbacks[miio_id] = callback self._send_queue.put(data) def append_callback(self, callback): + """Append a callback function.""" self._callbacks.append(callback) def append_known_sid(self, sid): + """Append a known sensor ID.""" self._known_sids.append(sid) - """Private.""" - def _create_socket(self): """Create connection socket.""" _LOGGER.debug("Creating socket...") @@ -169,16 +212,16 @@ def _init_listener(self): """Initialize socket connection with first ping. Set availability accordingly.""" try: # Send ping (w/o queue). - miio_id, ping = self._miio_msg_encode({"method": "internal.PING"}) + _, ping = self._miio_msg_encode({"method": ("%s" % INTERNAL_PING)}) self._socket.settimeout(0.1) self._socket.sendto(ping, (self._host, self._port)) # Wait for response. self._socket.settimeout(5.0) - res = self._socket.recvfrom(1480)[0] - # If didn't timeouted - gateway is available. + self._socket.recvfrom(1480) + # If didn't timeout - gateway is available. self._set_availability(True) except socket.timeout: - # If timeouted – gateway is unavailable. + # If timed out – gateway is unavailable. self._set_availability(False) except (TypeError, socket.error) as e: # Error: gateway configuration may be wrong. @@ -195,7 +238,6 @@ def _start_listening(self): """Create thread for loop.""" _LOGGER.debug("Starting thread...") self._thread = Thread(target=self._run_socket_thread, args=()) - #self._thread.daemon = True self._thread.start() _LOGGER.debug("Starting availability tracker...") self._track_availability() @@ -214,7 +256,7 @@ def _run_socket_thread(self): if self._socket is None: _LOGGER.error("No socket in listener!") - self.create_socket() + self._create_socket() continue try: @@ -226,7 +268,7 @@ def _run_socket_thread(self): self._socket.sendto(data, (self._host, self._port)) self._socket.settimeout(1) - data = self._socket.recvfrom(1480)[0] # Will timeout on no data. + data = self._socket.recvfrom(1480)[0] # Will timeout on no data. _LOGGER.debug("Received data:") _LOGGER.debug(data) @@ -235,7 +277,7 @@ def _run_socket_thread(self): self._set_availability(True) # Get all messages from response data. - resps = self._miio_msg_decode(data) + resps = _miio_msg_decode(data) # Parse all messages in response. self._parse_received_resps(resps) @@ -246,16 +288,15 @@ def _run_socket_thread(self): _LOGGER.error("Socket error!") _LOGGER.error(e) - """Gateway availability.""" - def _track_availability(self): """Check pings status and schedule next availability check.""" _LOGGER.debug("Starting to track availability...") # Schedule pings every TIME_INTERVAL_PING. self._availability_pinger = async_track_time_interval( - self.hass, self._ping, TIME_INTERVAL_PING) + self.hass, self._ping, TIME_INTERVAL_PING + ) - def _set_availability(self, available): + def _set_availability(self, available: bool): """Set availability of the gateway. Inform child devices.""" was_available = self._available availability_changed = (not available and was_available) or (available and not was_available) @@ -266,162 +307,100 @@ def _set_availability(self, available): self._available = False if availability_changed: - _LOGGER.info("Gateway availability changed! Available: " + str(available)) + _LOGGER.info("Gateway availability changed! Available: %s", str(available)) for func in self._callbacks: func(None, None, EVENT_AVAILABILITY) @callback - def _ping(self, event=None): + async def _ping(self, event=None): """Queue ping to keep and check connection.""" - self._pings_sent = self._pings_sent + 1 - self.send_to_hub({"method": "internal.PING"}) - sleep(6) # Give it `timeout` time to respond... + self._pings_sent += 1 + await self.send_to_hub({"method": INTERNAL_PING}) + await asyncio.sleep(6) # Give it `timeout` time to respond... if self._pings_sent >= 3: self._set_availability(False) - """Miio gateway protocol parsing.""" - - def _parse_received_resps(self, resps): + def _parse_received_resps(self, resps: list): """Parse received data.""" for res in resps: - if "result" in res: - """Handling request result response.""" - - miio_id = res.get("id") - if miio_id is not None and miio_id in self._result_callbacks: - - result = res.get("result") - # Convert '{"result":["ok"]}' to single value "ok". - if isinstance(result, list): - # Parse '[]' result. - if len(result) == 0: - result = "unknown" - else: - result = result[0] - self._result_callbacks[miio_id](result) - + self._handle_result(res) elif "method" in res: - """Handling new data received.""" - - if "model" not in res: - res["model"] = "lumi.gateway.mieu01" - model = res.get("model") - - if "sid" not in res: - res["sid"] = "miio.gateway" - sid = res.get("sid") - - params = res.get("params") - if params is None: - # Ensure params is dict - params = {} - if isinstance(params, list): - # Parse '[]' params - if len(params) == 0: - # Convert empty list to empty dict - params = {} - else: - # Extract list to dict - params = params[0] - if not isinstance(params, dict): - params = { "data": params } - - method = res.get("method") - if method.startswith("internal."): - """Internal method, nothing to do here.""" - continue - elif method in ["_sync.neighborDevInfo"]: - """Known but non-handled method.""" - continue - elif method.startswith("event."): - """Received event.""" - event = method - self._event_received(model, sid, event) - elif method == "_otc.log": - """Received metadata.""" - event = EVENT_METADATA - elif method == "props": - """Received values.""" - event = EVENT_VALUES - else: - """Unknown method.""" - _LOGGER.info("Received unknown method: " + str(method)) - continue - - # Now we have all the data we need - for func in self._callbacks: - func(model, sid, event, params) - + self._handle_method(res) else: - """Nothing that we can handle.""" - _LOGGER.error("Non-parseable data: " + str(res)) + _LOGGER.error("Non-parseable data: %s", str(res)) + + def _handle_result(self, res: dict): + miio_id = res.get("id") + if miio_id is not None and miio_id in self._result_callbacks: + result = res.get("result") + result = _process_result(result) + self._result_callbacks[miio_id](result) + + def _handle_method(self, res: dict): + res.setdefault("model", "lumi.gateway.mieu01") + res.setdefault("sid", "miio.gateway") + model = res.get("model") + sid = res.get("sid") + params = _process_params(res.get("params") or {}) + + method = res.get("method") + if _is_ignored_method(method): + return + + event = _determine_event(method) + if event: + self._trigger_callbacks(model, sid, event, params) + else: + _LOGGER.info("Received unknown method: %s", str(method)) + + def _trigger_callbacks(self, model, sid, event, params): + for func in self._callbacks: + func(model, sid, event, params) - def _event_received(self, model, sid, event): + def _event_received(self, model: str, sid: str, event: str): """Callback for receiving sensor event from gateway.""" - _LOGGER.debug("Received event: " + str(model) + " " + str(sid) + " - " + str(event)) + _LOGGER.debug("Received event: %s %s - %s", str(model), str(sid), str(event)) if sid not in self._known_sids: - _LOGGER.warning("Received event from unregistered sensor: " + str(model) + " " + str(sid) + " - " + str(event)) + _LOGGER.warning("Received event from unregistered sensor: %s %s - %s", str(model), str(sid), str(event)) - """Miio.""" - - def _miio_msg_encode(self, data): + def _miio_msg_encode(self, data: dict) -> tuple: """Encode data to be sent to gateway.""" - if data.get("method") and data.get("method") == "internal.PING": + if data.get("method") == INTERNAL_PING: msg = data else: if self._miio_id != 12345: - self._miio_id = self._miio_id + 1 + self._miio_id += 1 else: - self._miio_id = self._miio_id + 2 + self._miio_id += 2 if self._miio_id > 999999999: self._miio_id = 1 - msg = { "id": self._miio_id } + msg = {"id": self._miio_id} msg.update(data) - return([self._miio_id, (json.dumps(msg)).encode()]) - - def _miio_msg_decode(self, data): - """Decode data received from gateway.""" - - # Trim `0` from the end of data string. - if data[-1] == 0: - data = data[:-1] - - # Prepare array of responses. - resps = [] - try: - data_arr = "[" + data.decode().replace("}{", "},{") + "]" - resps = json.loads(data_arr) - except: - _LOGGER.warning("Bad JSON received: " + str(data)) - return resps + return self._miio_id, json.dumps(msg).encode() class XiaomiGwDevice(RestoreEntity): """A generic device of Gateway.""" - def __init__(self, gw, platform, device_class = None, sid = None, name = None, restore = None): + def __init__(self, gw: XiaomiGw, platform: str, device_class=None, sid=None, name=None, restore=None): """Initialize the device.""" - self._gw = gw self._send_to_hub = self._gw.send_to_hub - self._state = None self._restore = restore self._sid = sid self._name = name - self._model = None self._voltage = None self._lqi = None self._alive = None - if device_class is None: - self._unique_id = "{}_{}".format(sid, platform) - self.entity_id = platform + "." + sid.replace(".", "_") + self._unique_id = f"{sid}_{platform}" + self.entity_id = f"{platform}.{sid.replace('.', '_')}" else: - self._unique_id = "{}_{}_{}".format(sid, platform, device_class) - self.entity_id = platform + "." + sid.replace(".", "_") + "_" + device_class + self._unique_id = f"{sid}_{platform}_{device_class}" + self.entity_id = f"{platform}.{sid.replace('.', '_')}_{device_class}" async def async_added_to_hass(self): """Add push data listener for this device.""" @@ -432,84 +411,79 @@ async def async_added_to_hass(self): self._state = state.state @property - def name(self): + def name(self) -> str: + """Return the name of the device.""" return self._name @property def unique_id(self) -> str: + """Return a unique ID for the device.""" return self._unique_id @property - def available(self): + def available(self) -> bool: + """Return the availability of the device.""" return self._gw.is_available() @property - def should_poll(self): + def should_poll(self) -> bool: + """Return False as the device should not be polled.""" return False @property - def extra_state_attributes(self): - attrs = { ATTR_VOLTAGE: self._voltage, ATTR_LQI: self._lqi, ATTR_MODEL: self._model, ATTR_ALIVE: self._alive } - return attrs + def extra_state_attributes(self) -> dict: + """Return the extra state attributes of the device.""" + return { + ATTR_VOLTAGE: self._voltage, + ATTR_LQI: self._lqi, + ATTR_MODEL: self._model, + ATTR_ALIVE: self._alive, + } def _add_push_data_job(self, *args): + """Add a job to handle push data.""" self.hass.add_job(self._push_data, *args) @callback - def _push_data(self, model = None, sid = None, event = None, params = {}): + def _push_data(self, model=None, sid=None, event=None, params=None): """Push data that came from gateway to parser. Update HA state if any changes were made.""" - - # If should/need get into real parsing + if params is None: + params = {} init_parse = self._pre_parse_data(model, sid, event, params) if init_parse is not None: - # Update HA state - if init_parse == True: + if init_parse: self.async_schedule_update_ha_state() return - - # If parsed some data has_data = self.parse_incoming_data(model, sid, event, params) if has_data: - # Update HA state self.async_schedule_update_ha_state() - return - def parse_incoming_data(self, model, sid, event, params): - """Parse incoming data from gateway. Abstract.""" + def parse_incoming_data(self, model: str, sid: str, event: str, params: dict) -> bool: + """Parse incoming data from gateway. Abstract method to be implemented by subclasses.""" raise NotImplementedError() def update_device_params(self): - """If component needs to read data first to get it's state.""" + """Update device parameters if necessary.""" pass - def _pre_parse_data(self, model, sid, event, params): + def _pre_parse_data(self, model: str, sid: str, event: str, params: dict) -> bool | None: """Make initial checks and return bool if parsing shall be ended.""" - - # Generic handler for availability change - # Devices are getting availability state from Gateway itself if event == EVENT_AVAILABILITY: self.update_device_params() return True - if self._sid != sid: return False - if model is not None: self._model = model - - # Generic handler for event.keepalive if event == EVENT_KEEPALIVE: self._alive = utcnow() return True - - # Generic handler for _otg.log if event == EVENT_METADATA: - zigbeeData = params.get("subdev_zigbee") - if zigbeeData is not None: - self._voltage = zigbeeData.get("voltage") - self._lqi = zigbeeData.get("lqi") - _LOGGER.info("Vol:" + str(self._voltage) + " lqi:" + str(self._lqi)) + zigbee_data = params.get("subdev_zigbee") + if zigbee_data is not None: + self._voltage = zigbee_data.get("voltage") + self._lqi = zigbee_data.get("lqi") + _LOGGER.info("Vol: %s lqi: %s", str(self._voltage), str(self._lqi)) return True return False - return None diff --git a/custom_components/miio_gateway/alarm_control_panel.py b/custom_components/miio_gateway/alarm_control_panel.py index 32d915f..4bf579e 100644 --- a/custom_components/miio_gateway/alarm_control_panel.py +++ b/custom_components/miio_gateway/alarm_control_panel.py @@ -1,30 +1,35 @@ +import asyncio import logging -import homeassistant.components.alarm_control_panel as alarm - -from . import DOMAIN, XiaomiGwDevice - +from homeassistant.components.alarm_control_panel import AlarmControlPanelEntity, AlarmControlPanelEntityFeature from homeassistant.const import ( STATE_ALARM_ARMED_AWAY, STATE_ALARM_ARMED_HOME, STATE_ALARM_ARMED_NIGHT, STATE_ALARM_DISARMED, STATE_ALARM_TRIGGERED) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType -from homeassistant.components.alarm_control_panel.const import ( - SUPPORT_ALARM_ARM_AWAY, SUPPORT_ALARM_ARM_HOME, - SUPPORT_ALARM_ARM_NIGHT, SUPPORT_ALARM_TRIGGER) +from . import DOMAIN, XiaomiGwDevice _LOGGER = logging.getLogger(__name__) -def setup_platform(hass, config, add_entities, discovery_info=None): - _LOGGER.info("Setting up alarm") - devices = [] + +async def async_setup_platform( + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType = None): + _LOGGER.info("Setting up alarm control panel") gateway = hass.data[DOMAIN] - devices.append(XiaomiGatewayAlarm(gateway)) - add_entities(devices) + async_add_entities([XiaomiGatewayAlarm(gateway)]) -class XiaomiGatewayAlarm(XiaomiGwDevice, alarm.AlarmControlPanelEntity): + +class XiaomiGatewayAlarm(XiaomiGwDevice, AlarmControlPanelEntity): + """Representation of a Xiaomi Gateway Alarm.""" def __init__(self, gw): - XiaomiGwDevice.__init__(self, gw, "alarm_control_panel", None, "miio.gateway", "Gateway Alarm") + """Initialize the alarm control panel.""" + super().__init__(gw, "alarm_control_panel", None, "miio.gateway", "Gateway Alarm") # Default to ARMED_AWAY if no volume data was set self._state_by_volume = STATE_ALARM_ARMED_AWAY @@ -36,84 +41,85 @@ def __init__(self, gw): self.update_device_params() def update_device_params(self): + """Update the device parameters.""" if self._gw.is_available(): - self._send_to_hub({ "method": "get_prop", "params": ["arming"] }, self._init_set_arming) - self._send_to_hub({ "method": "get_prop", "params": ["alarming_volume"] }, self._init_set_volume) + asyncio.create_task(self._send_to_hub({"method": "get_prop", "params": ["arming"]}, self._init_set_arming)) + asyncio.create_task( + self._send_to_hub({"method": "get_prop", "params": ["alarming_volume"]}, self._init_set_volume)) def _init_set_arming(self, result): if result is not None: - _LOGGER.info("SETTING ARMED: " + str(result)) - if result == "on": - self._state = self._state_by_volume - elif result == "off": - self._state = STATE_ALARM_DISARMED + _LOGGER.info("Setting armed state: %s", result) + self._state = self._state_by_volume if result == "on" else STATE_ALARM_DISARMED def _init_set_volume(self, result): if result is not None: - _LOGGER.info("SETTING ARMED VOL: " + str(result)) + _LOGGER.info("Setting armed volume: %s", result) self._volume = int(result) self._state_by_volume = self._get_state_by_volume(self._volume) if self._is_armed(): self._state = self._state_by_volume - def alarm_disarm(self, code=None): + async def async_alarm_disarm(self, code=None): """Send disarm command.""" - self._disarm() + await self._disarm() self._state = STATE_ALARM_DISARMED - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def alarm_arm_away(self, code=None): + async def async_alarm_arm_away(self, code=None): """Send arm away command.""" self._volume = 80 - self._arm() + await self._arm() self._state = STATE_ALARM_ARMED_AWAY - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def alarm_arm_home(self, code=None): + async def async_alarm_arm_home(self, code=None): """Send arm home command.""" self._volume = 25 - self._arm() + await self._arm() self._state = STATE_ALARM_ARMED_HOME - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def alarm_arm_night(self, code=None): + async def async_alarm_arm_night(self, code=None): """Send arm night command.""" self._volume = 15 - self._arm() + await self._arm() self._state = STATE_ALARM_ARMED_NIGHT - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def alarm_trigger(self, code=None): + async def async_alarm_trigger(self, code=None): """Trigger the alarm.""" - self._siren() - self._blink() + await self._siren() + await self._blink() self._state = STATE_ALARM_TRIGGERED - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def _arm(self): - self._send_to_hub({ "method": "set_alarming_volume", "params": [self._volume] }) - self._send_to_hub({ "method": "set_sound_playing", "params": ["off"] }) - self._send_to_hub({ "method": "set_arming", "params": ["on"] }) + async def _arm(self): + """Arm the alarm.""" + await self._send_to_hub({"method": "set_alarming_volume", "params": [self._volume]}) + await self._send_to_hub({"method": "set_sound_playing", "params": ["off"]}) + await self._send_to_hub({"method": "set_arming", "params": ["on"]}) - def _disarm(self): - self._send_to_hub({ "method": "set_sound_playing", "params": ["off"] }) - self._send_to_hub({ "method": "set_arming", "params": ["off"] }) + async def _disarm(self): + """Disarm the alarm.""" + await self._send_to_hub({"method": "set_sound_playing", "params": ["off"]}) + await self._send_to_hub({"method": "set_arming", "params": ["off"]}) - def _siren(self): - # TODO playlist - self._send_to_hub({ "method": "play_music_new", "params": [str(self._ringtone), self._volume] }) + async def _siren(self): + """Activate the siren.""" + await self._send_to_hub({"method": "play_music_new", "params": [str(self._ringtone), self._volume]}) - def _blink(self): - # TODO blink + async def _blink(self): + """Activate the blink.""" argbhex = [int("01" + self._color, 16), int("64" + self._color, 16)] - self._send_to_hub({ "method": "set_rgb", "params": [argbhex[1]] }) + await self._send_to_hub({"method": "set_rgb", "params": [argbhex[1]]}) - def _is_armed(self): - if self._state is not None or self._state != STATE_ALARM_TRIGGERED or self._state != STATE_ALARM_DISARMED: - return True - return False + def _is_armed(self) -> bool: + """Check if the alarm is armed.""" + return self._state not in [STATE_ALARM_TRIGGERED, STATE_ALARM_DISARMED] - def _get_state_by_volume(self, volume): + def _get_state_by_volume(self, volume: int) -> str: + """Get the alarm state based on volume.""" if volume < 20: return STATE_ALARM_ARMED_NIGHT elif volume < 30: @@ -123,20 +129,25 @@ def _get_state_by_volume(self, volume): @property def state(self): + """Return the state of the alarm control panel.""" return self._state @property def supported_features(self) -> int: - return SUPPORT_ALARM_ARM_HOME | SUPPORT_ALARM_ARM_AWAY | SUPPORT_ALARM_ARM_NIGHT | SUPPORT_ALARM_TRIGGER + """Return the supported features.""" + return ( + AlarmControlPanelEntityFeature.ARM_AWAY + | AlarmControlPanelEntityFeature.ARM_HOME + | AlarmControlPanelEntityFeature.ARM_NIGHT + | AlarmControlPanelEntityFeature.TRIGGER + ) - def parse_incoming_data(self, model, sid, event, params): + def parse_incoming_data(self, model, sid, event, params) -> bool: + """Parse incoming data from gateway.""" arming = params.get("arming") if arming is not None: - if arming == "on": - self._state = self._get_state_by_volume(self._volume) - elif arming == "off": - self._state = STATE_ALARM_DISARMED + self._state = self._get_state_by_volume(self._volume) if arming == "on" else STATE_ALARM_DISARMED return True alarming_volume = params.get("alarming_volume") @@ -145,6 +156,6 @@ def parse_incoming_data(self, model, sid, event, params): self._state_by_volume = self._get_state_by_volume(self._volume) if self._is_armed(): self._state = self._state_by_volume - return True + return True return False diff --git a/custom_components/miio_gateway/binary_sensor.py b/custom_components/miio_gateway/binary_sensor.py index a21062c..148742d 100644 --- a/custom_components/miio_gateway/binary_sensor.py +++ b/custom_components/miio_gateway/binary_sensor.py @@ -5,7 +5,9 @@ from homeassistant.components.binary_sensor import ( BinarySensorEntity, DEVICE_CLASSES) from homeassistant.const import STATE_OFF, STATE_ON +from homeassistant.core import HomeAssistant from homeassistant.helpers.event import async_track_point_in_utc_time +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from . import DOMAIN, CONF_DATA_DOMAIN, CONF_SENSOR_SID, CONF_SENSOR_CLASS, CONF_SENSOR_NAME, CONF_SENSOR_RESTORE, \ EVENT_VALUES, XiaomiGwDevice @@ -45,11 +47,15 @@ IGNORED_EVENTS = [EVENT_VALUES, EVENT_TILT_ANGLE, EVENT_COORDINATION] -def setup_platform(hass, config, add_entities, discovery_info=None): +async def async_setup_platform( + hass: HomeAssistant, + config: ConfigType, + async_add_entities, + discovery_info: DiscoveryInfoType = None): _LOGGER.info("Setting up binary sensors") # Make a list of all default + custom device classes - all_device_classes = DEVICE_CLASSES + all_device_classes = list(DEVICE_CLASSES) all_device_classes.append(DEVICE_CLASS_BUTTON) gateway = hass.data[DOMAIN] @@ -70,21 +76,23 @@ def setup_platform(hass, config, add_entities, discovery_info=None): gateway.append_known_sid(sid) if device_class in all_device_classes: - _LOGGER.info("Registering " + str(device_class) + " sid " + str(sid) + " as binary_sensor") + _LOGGER.info("Registering %s sid %s as binary_sensor", device_class, sid) entities.append(XiaomiGwBinarySensor(gateway, device_class, sid, name, restore)) if not entities: _LOGGER.info("No binary_sensors configured") return False - add_entities(entities) + async_add_entities(entities) return True class XiaomiGwBinarySensor(XiaomiGwDevice, BinarySensorEntity): + """Representation of a Xiaomi Gateway Binary Sensor.""" def __init__(self, gw, device_class, sid, name, restore): - XiaomiGwDevice.__init__(self, gw, "binary_sensor", device_class, sid, name, restore) + """Initialize the binary sensor.""" + super().__init__(gw, "binary_sensor", device_class, sid, name, restore) # Custom Button device class if device_class == DEVICE_CLASS_BUTTON: @@ -93,25 +101,28 @@ def __init__(self, gw, device_class, sid, name, restore): self._device_class = device_class self._last_action = None - self._state_timer = None @property - def is_on(self): - return False if self._state == STATE_OFF else True + def is_on(self) -> bool: + """Return true if the binary sensor is on.""" + return self._state == STATE_ON @property - def device_class(self): + def device_class(self) -> str: + """Return the class of this device.""" return self._device_class @property - def extra_state_attributes(self): + def extra_state_attributes(self) -> dict: + """Return the state attributes.""" attrs = super().extra_state_attributes if self._last_action is not None: attrs.update({ATTR_LAST_ACTION: self._last_action}) return attrs - def parse_incoming_data(self, model, sid, event, params): + def parse_incoming_data(self, model, sid, event, params) -> bool: + """Parse incoming data from gateway.""" # Ignore params event for this platform if event in IGNORED_EVENTS: @@ -127,7 +138,7 @@ def parse_incoming_data(self, model, sid, event, params): 'entity_id': self.entity_id, 'event_type': event_type }) - self._state = event_type + self._state = STATE_ON self._last_action = event_type self._start_state_timer() diff --git a/custom_components/miio_gateway/light.py b/custom_components/miio_gateway/light.py index 9a46088..783f88c 100644 --- a/custom_components/miio_gateway/light.py +++ b/custom_components/miio_gateway/light.py @@ -1,26 +1,44 @@ -import logging +import asyncio import binascii +import logging import struct -from homeassistant.components.light import ( - LightEntity, ATTR_BRIGHTNESS, ATTR_HS_COLOR, SUPPORT_BRIGHTNESS, SUPPORT_COLOR) import homeassistant.util.color as color_util +from homeassistant.components.light import ( + ATTR_BRIGHTNESS, + ATTR_HS_COLOR, + ColorMode, + LightEntity, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from . import DOMAIN, XiaomiGwDevice _LOGGER = logging.getLogger(__name__) -def setup_platform(hass, config, add_entities, discovery_info=None): + +async def async_setup_platform( + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType = None): + """Set up the light platform.""" _LOGGER.info("Setting up light") - devices = [] gateway = hass.data[DOMAIN] - devices.append(XiaomiGatewayLight(gateway)) - add_entities(devices) + async_add_entities([XiaomiGatewayLight(gateway)]) + class XiaomiGatewayLight(XiaomiGwDevice, LightEntity): + """Representation of a Xiaomi Gateway Light.""" + + _attr_color_mode = ColorMode.BRIGHTNESS + _attr_supported_color_modes = {ColorMode.BRIGHTNESS} def __init__(self, gw): - XiaomiGwDevice.__init__(self, gw, "light", None, "miio.gateway", "Gateway LED") + """Initialize the light.""" + super().__init__(gw, "light", None, "miio.gateway", "Gateway LED") self._hs = (0, 0) self._brightness = 100 self._state = False @@ -28,26 +46,32 @@ def __init__(self, gw): self.update_device_params() def update_device_params(self): + """Update the device parameters.""" if self._gw.is_available(): - self._send_to_hub({ "method": "toggle_light", "params": ["off"] }) + asyncio.create_task(self._send_to_hub({"method": "toggle_light", "params": ["off"]})) @property - def is_on(self): + def is_on(self) -> bool: + """Return true if the light is on.""" return self._state @property - def brightness(self): + def brightness(self) -> int: + """Return the brightness of the light.""" return int(255 * self._brightness / 100) @property - def hs_color(self): + def hs_color(self) -> tuple: + """Return the hs color value.""" return self._hs @property - def supported_features(self): - return SUPPORT_BRIGHTNESS | SUPPORT_COLOR + def supported_color_modes(self): + """Return the supported color modes.""" + return {"hs"} - def turn_on(self, **kwargs): + async def async_turn_on(self, **kwargs): + """Turn on or control the light.""" if ATTR_HS_COLOR in kwargs: self._hs = kwargs[ATTR_HS_COLOR] if ATTR_BRIGHTNESS in kwargs: @@ -56,16 +80,18 @@ def turn_on(self, **kwargs): argb = (self._brightness,) + rgb argbhex = binascii.hexlify(struct.pack("BBBB", *argb)).decode("ASCII") argbhex = int(argbhex, 16) - self._send_to_hub({ "method": "set_rgb", "params": [argbhex] }) + await self._send_to_hub({"method": "set_rgb", "params": [argbhex]}) self._state = True - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def turn_off(self, **kwargs): - self._send_to_hub({ "method": "toggle_light", "params": ["off"] }) + async def async_turn_off(self, **kwargs): + """Turn off the light.""" + await self._send_to_hub({"method": "toggle_light", "params": ["off"]}) self._state = False - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def parse_incoming_data(self, model, sid, event, params): + def parse_incoming_data(self, model, sid, event, params) -> bool: + """Parse incoming data from gateway.""" light = params.get("light") if light is not None: diff --git a/custom_components/miio_gateway/media_player.py b/custom_components/miio_gateway/media_player.py index e350a6b..838b190 100644 --- a/custom_components/miio_gateway/media_player.py +++ b/custom_components/miio_gateway/media_player.py @@ -1,14 +1,17 @@ +import asyncio import logging from datetime import timedelta from homeassistant.components.media_player import MediaPlayerEntity from homeassistant.components.media_player.const import ( - MEDIA_TYPE_MUSIC, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_MUTE, SUPPORT_PLAY_MEDIA, - SUPPORT_PLAY, SUPPORT_STOP) + MEDIA_TYPE_MUSIC, MediaPlayerEntityFeature) from homeassistant.const import ( STATE_IDLE, STATE_PLAYING) +from homeassistant.core import HomeAssistant from homeassistant.core import callback +from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.event import async_track_point_in_utc_time +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.util.dt import utcnow from . import DOMAIN, XiaomiGwDevice @@ -17,20 +20,27 @@ PLAYING_TIME = timedelta(seconds=10) -SUPPORT_PLAYER = SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | SUPPORT_PLAY_MEDIA |\ - SUPPORT_PLAY | SUPPORT_STOP +SUPPORT_PLAYER = (MediaPlayerEntityFeature.VOLUME_SET | MediaPlayerEntityFeature.VOLUME_MUTE | + MediaPlayerEntityFeature.PLAY_MEDIA | MediaPlayerEntityFeature.PLAY | MediaPlayerEntityFeature.STOP) -def setup_platform(hass, config, add_entities, discovery_info=None): + +async def async_setup_platform( + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType = None): + """Set up the sound player platform.""" _LOGGER.info("Setting up sound player") - devices = [] gateway = hass.data[DOMAIN] - devices.append(XiaomiGatewayLight(gateway)) - add_entities(devices) + async_add_entities([XiaomiGatewayMediaPlayer(gateway)]) + -class XiaomiGatewayLight(XiaomiGwDevice, MediaPlayerEntity): +class XiaomiGatewayMediaPlayer(XiaomiGwDevice, MediaPlayerEntity): + """Representation of a Xiaomi Gateway Media Player.""" def __init__(self, gw): - XiaomiGwDevice.__init__(self, gw, "media_player", None, "miio.gateway", "Gateway Player") + """Initialize the media player.""" + super().__init__(gw, "media_player", None, "miio.gateway", "Gateway Player") self._volume = None self._muted = False self._ringtone = 1 @@ -40,93 +50,107 @@ def __init__(self, gw): self.update_device_params() def update_device_params(self): + """Update the device parameters.""" if self._gw.is_available(): - self._send_to_hub({ "method": "get_prop", "params": ["gateway_volume"] }, self._init_set_volume) + asyncio.create_task( + self._send_to_hub({"method": "get_prop", "params": ["gateway_volume"]}, self._init_set_volume)) def _init_set_volume(self, result): if result is not None: - _LOGGER.info("SETTING VOL: " + str(result)) + _LOGGER.info("Setting volume: %s", result) self._volume = int(result) / 100 - def set_volume_level(self, volume): + async def async_set_volume_level(self, volume): + """Set volume level.""" int_volume = int(volume * 100) - self._send_to_hub({ "method": "set_gateway_volume", "params": [int_volume] }) + await self._send_to_hub({"method": "set_gateway_volume", "params": [int_volume]}) self._volume = volume - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def mute_volume(self, mute): - self._send_to_hub({ "method": "set_mute", "params": [str(mute).lower()] }) + async def async_mute_volume(self, mute): + """Mute the volume.""" + await self._send_to_hub({"method": "set_mute", "params": [str(mute).lower()]}) self._muted = mute - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def play_media(self, media_type, media_id, **kwargs): + async def async_play_media(self, media_type, media_id, **kwargs): + """Play media.""" if media_type == MEDIA_TYPE_MUSIC: - print(kwargs) self._ringtone = media_id - self.media_play() + await self.async_media_play() - def media_play(self, new_volume=None): + async def async_media_play(self, new_volume=None): + """Play media.""" int_volume = int(self._volume * 100) if new_volume is not None: int_volume = int(new_volume) - self._send_to_hub({ "method": "play_music_new", "params": [str(self._ringtone), int_volume] }) + await self._send_to_hub({"method": "play_music_new", "params": [str(self._ringtone), int_volume]}) self._state = STATE_PLAYING self._player_tracker = async_track_point_in_utc_time( - self.hass, self._async_playing_finished, - utcnow() + PLAYING_TIME) - self.schedule_update_ha_state() + self.hass, self._async_playing_finished, utcnow() + PLAYING_TIME) + self.async_schedule_update_ha_state() - def media_stop(self): + async def async_media_stop(self): + """Stop media.""" if self._player_tracker is not None: self._player_tracker() self._player_tracker = None - self._send_to_hub({ "method": "set_sound_playing", "params": ["off"] }) + await self._send_to_hub({"method": "set_sound_playing", "params": ["off"]}) self._state = STATE_IDLE - self.schedule_update_ha_state() + self.async_schedule_update_ha_state() - def media_pause(self): - self.media_stop() + async def async_media_pause(self): + """Pause media.""" + await self.async_media_stop() @property def state(self): + """Return the state of the media player.""" return self._state @property def volume_level(self): + """Return the volume level.""" return self._volume @property def is_volume_muted(self): + """Return True if volume is muted.""" return self._muted @property def media_artist(self): + """Return the media artist.""" return "Alarm" @property def media_title(self): - return "No " + str(self._ringtone) + """Return the media title.""" + return f"Ringtone {self._ringtone}" @property def supported_features(self): - return SUPPORT_PLAYER + """Return the supported features.""" + return SUPPORT_PLAYER @property def media_content_type(self): + """Return the media content type.""" return MEDIA_TYPE_MUSIC @callback def _async_playing_finished(self, now): + """Handle when playing is finished.""" self._player_tracker = None self._state = STATE_IDLE self.async_schedule_update_ha_state() - def parse_incoming_data(self, model, sid, event, params): + def parse_incoming_data(self, model, sid, event, params) -> bool: + """Parse incoming data from gateway.""" gateway_volume = params.get("gateway_volume") if gateway_volume is not None: - float_volume = gateway_volume / 100 - self._volume = float_volume + self._volume = gateway_volume / 100 return True return False diff --git a/custom_components/miio_gateway/sensor.py b/custom_components/miio_gateway/sensor.py index 5830dbc..98b45c0 100644 --- a/custom_components/miio_gateway/sensor.py +++ b/custom_components/miio_gateway/sensor.py @@ -1,32 +1,41 @@ import logging -from homeassistant.const import ( - TEMP_CELSIUS, DEVICE_CLASS_ILLUMINANCE, DEVICE_CLASS_TEMPERATURE, DEVICE_CLASS_HUMIDITY, DEVICE_CLASS_PRESSURE) -from homeassistant.components.sensor import ( - DEVICE_CLASSES) +from homeassistant.components.sensor import SensorDeviceClass, SensorEntity +from homeassistant.const import UnitOfTemperature +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType -from . import DOMAIN, CONF_DATA_DOMAIN, CONF_SENSOR_SID, CONF_SENSOR_CLASS, CONF_SENSOR_NAME, CONF_SENSOR_RESTORE, XiaomiGwDevice +from . import DOMAIN, CONF_DATA_DOMAIN, CONF_SENSOR_SID, CONF_SENSOR_CLASS, CONF_SENSOR_NAME, CONF_SENSOR_RESTORE, \ + XiaomiGwDevice _LOGGER = logging.getLogger(__name__) SENSOR_TYPES = { - DEVICE_CLASS_ILLUMINANCE: {"unit_of_measurement": "lm", "icon": "mdi:white-balance-sunny"}, - DEVICE_CLASS_TEMPERATURE: {"unit_of_measurement": TEMP_CELSIUS, "icon": "mdi:thermometer"}, - DEVICE_CLASS_HUMIDITY: {"unit_of_measurement": "%", "icon": "mdi:water-percent"}, - DEVICE_CLASS_PRESSURE: {"unit_of_measurement": "hPa", "icon": "mdi:weather-windy"}, + SensorDeviceClass.ILLUMINANCE: {"unit_of_measurement": "lm", "icon": "mdi:white-balance-sunny"}, + SensorDeviceClass.TEMPERATURE: {"unit_of_measurement": UnitOfTemperature.CELSIUS, "icon": "mdi:thermometer"}, + SensorDeviceClass.HUMIDITY: {"unit_of_measurement": "%", "icon": "mdi:water-percent"}, + SensorDeviceClass.PRESSURE: {"unit_of_measurement": "hPa", "icon": "mdi:weather-windy"}, } -def setup_platform(hass, config, add_entities, discovery_info=None): + +async def async_setup_platform( + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType = None): + """Set up the sensors.""" _LOGGER.info("Setting up sensors") # Make a list of default + custom device classes - all_device_classes = DEVICE_CLASSES + all_device_classes = list(SENSOR_TYPES.keys()) gateway = hass.data[DOMAIN] entities = [] - # Gateways's illuminace sensor - entities.append(XiaomiGwSensor(gateway, DEVICE_CLASS_ILLUMINANCE, "miio.gateway", "Gateway Illuminance Sensor", False)) + # Gateway's illuminance sensor + entities.append( + XiaomiGwSensor(gateway, SensorDeviceClass.ILLUMINANCE, "miio.gateway", "Gateway Illuminance Sensor", False)) for cfg in hass.data[CONF_DATA_DOMAIN]: if not cfg: @@ -43,69 +52,69 @@ def setup_platform(hass, config, add_entities, discovery_info=None): gateway.append_known_sid(sid) if device_class in all_device_classes: - _LOGGER.info("Registering " + str(device_class) + " sid " + str(sid) + " as sensor") + _LOGGER.info("Registering %s sid %s as sensor", device_class, sid) entities.append(XiaomiGwSensor(gateway, device_class, sid, name, restore)) if not entities: _LOGGER.info("No sensors configured") - return False + return - add_entities(entities) - return True + async_add_entities(entities) -class XiaomiGwSensor(XiaomiGwDevice): - def __init__(self, gw, device_class, sid, name, restore): - XiaomiGwDevice.__init__(self, gw, "sensor", device_class, sid, name, restore) +class XiaomiGwSensor(XiaomiGwDevice, SensorEntity): + """Representation of a Xiaomi Gateway Sensor.""" + def __init__(self, gw, device_class, sid, name, restore): + """Initialize the sensor.""" + super().__init__(gw, "sensor", device_class, sid, name, restore) self._device_class = device_class @property def state(self): + """Return the state of the sensor.""" return self._state @property def device_class(self): + """Return the class of this device.""" return self._device_class @property def icon(self): - try: - return SENSOR_TYPES.get(self._device_class).get("icon") - except TypeError: - return None + """Return the icon to use in the frontend, if any.""" + return SENSOR_TYPES.get(self._device_class, {}).get("icon") @property def unit_of_measurement(self): - try: - return SENSOR_TYPES.get(self._device_class).get("unit_of_measurement") - except TypeError: - return None - - def parse_incoming_data(self, model, sid, event, params): - - if self._device_class == DEVICE_CLASS_ILLUMINANCE: + """Return the unit of measurement.""" + return SENSOR_TYPES.get(self._device_class, {}).get("unit_of_measurement") + + def parse_incoming_data(self, model, sid, event, params) -> bool: + """Parse incoming data from gateway.""" + + if self._device_class == SensorDeviceClass.ILLUMINANCE: illumination = params.get("illumination") if illumination is not None: self._state = illumination return True - elif self._device_class == DEVICE_CLASS_TEMPERATURE: + elif self._device_class == SensorDeviceClass.TEMPERATURE: temperature = params.get("temperature") if temperature is not None: - self._state = round(temperature/100, 1) + self._state = round(temperature / 100, 1) return True - elif self._device_class == DEVICE_CLASS_HUMIDITY: + elif self._device_class == SensorDeviceClass.HUMIDITY: humidity = params.get("humidity") if humidity is not None: - self._state = round(humidity/100, 1) + self._state = round(humidity / 100, 1) return True - elif self._device_class == DEVICE_CLASS_PRESSURE: + elif self._device_class == SensorDeviceClass.PRESSURE: pressure = params.get("pressure") if pressure is not None: - self._state = round(pressure/100, 1) + self._state = round(pressure / 100, 1) return True return False