diff --git a/pywizlight/bulb.py b/pywizlight/bulb.py index 94217ed..f1f2f4c 100755 --- a/pywizlight/bulb.py +++ b/pywizlight/bulb.py @@ -278,6 +278,11 @@ def get_mac(self) -> Optional[str]: """Return MAC from the bulb.""" return _extract_str(self.pilotResult, "mac") + def get_power(self) -> Optional[float]: + """Return power data from the bulb in watts.""" + milli_watts = _extract_int(self.pilotResult, "pc") + return None if milli_watts is None else milli_watts / 1000 + def get_warm_white(self) -> Optional[int]: """Get the value of the warm white led.""" return _extract_int(self.pilotResult, "w") @@ -441,6 +446,7 @@ def __init__( self.push_cancel: Optional[Callable] = None self.last_push: float = NEVER_TIME self.push_running: bool = False + self.power_monitoring: Optional[bool] = None # Check connection removed as it did blocking I/O in the event loop @property @@ -659,6 +665,19 @@ async def set_state(self, pilot_builder: PilotBuilder = PilotBuilder()) -> None: # TODO: self.status could be None, in which case casting it to a bool might not be what we really want await self.send(pilot_builder.set_state_message(bool(self.status))) + async def get_power(self) -> Optional[float]: + """Get watts from the device.""" + if self.last_push + MAX_TIME_BETWEEN_PUSH < time.monotonic(): + if self.power_monitoring is not False: + try: + resp = await self.send({"method": "getPower"}) + except WizLightMethodNotFound: + self.power_monitoring = False + return None + if resp is not None and "result" in resp: + return resp["result"]["power"] / 1000 + return self.state.get_power() if self.state else None + # ---------- Helper Functions ------------ async def updateState(self) -> Optional[PilotParser]: """Update the bulb state. diff --git a/pywizlight/tests/fake_bulb.py b/pywizlight/tests/fake_bulb.py index 14f5f96..c814393 100644 --- a/pywizlight/tests/fake_bulb.py +++ b/pywizlight/tests/fake_bulb.py @@ -474,6 +474,34 @@ "drvConf": [20, 1], }, }, + ("ESP25_SOCKET_01", "1.26.2"): { + "method": "getSystemConfig", + "env": "pro", + "result": { + "mac": "d8a0119906b7", + "homeId": "**REDACTED**", + "roomId": "**REDACTED**", + "rgn": "eu", + "moduleName": "ESP25_SOCKET_01", + "fwVersion": "1.26.2", + "groupId": 0, + "ping": 0, + }, + }, + ("ESP25_SOCKET_01", "1.26.1"): { + "method": "getSystemConfig", + "env": "pro", + "result": { + "mac": "d8a0119906b7", + "homeId": "**REDACTED**", + "roomId": "**REDACTED**", + "rgn": "eu", + "moduleName": "ESP25_SOCKET_01", + "fwVersion": "1.26.2", + "groupId": 0, + "ping": 0, + }, + }, } USER_CONFIGS: Dict[Tuple[str, str], Any] = { # AKA getUserConfig @@ -657,6 +685,12 @@ "error": {"code": -32601, "message": "Method not found"}, } +GET_POWER_NOT_FOUND = { + "method": "getPower", + "env": "pro", + "error": {"code": -32601, "message": "Method not found"}, +} + def get_initial_pilot() -> Dict[str, Any]: return { @@ -694,6 +728,17 @@ def get_initial_user_config(module_name: str, firmware_version: str) -> Dict[str return USER_CONFIGS.get((module_name, firmware_version), USER_CONFIG_NOT_FOUND) +def get_power(module_name: str, firmware_version: str) -> Dict[str, Any]: + if module_name == "ESP25_SOCKET_01" and firmware_version == "1.26.2": + return {"method": "getPower", "env": "pro", "result": {"power": 1065385}} + if module_name == "ESP25_SOCKET_01" and firmware_version == "1.26.1": + return { + "method": "getPower", + "env": "pro", + } + return GET_POWER_NOT_FOUND + + BULB_JSON_ERROR = b'{"env":"pro","error":{"code":-32700,"message":"Parse error"}}' @@ -705,6 +750,7 @@ class BulbUDPRequestHandler: model_config: Dict[str, Any] # Will be set by constructor for the actual class user_config: Dict[str, Any] registration: Dict[str, Any] + get_power: Dict[str, Any] transport: asyncio.DatagramTransport def handle(self, resp: bytes, addr: Tuple[str, int]) -> None: @@ -734,6 +780,8 @@ def handle(self, resp: bytes, addr: Tuple[str, int]) -> None: self.transport.sendto(bytes(json.dumps(self.sys_config), "utf-8"), addr) elif method == "getModelConfig": self.transport.sendto(bytes(json.dumps(self.model_config), "utf-8"), addr) + elif method == "getPower": + self.transport.sendto(bytes(json.dumps(self.get_power), "utf-8"), addr) elif method == "getUserConfig": # Simulate late response of model config missing to ensure # it does not break getUserConfig @@ -777,6 +825,7 @@ async def make_udp_fake_bulb_server( "env": "pro", "result": {"mac": "a8bb5006033d", "success": True}, } + handler.get_power = get_power(module_name, firmware_version) transport_proto = await asyncio.get_event_loop().create_datagram_endpoint( lambda: WizProtocol(on_response=handler.handle), diff --git a/pywizlight/tests/test_bulb.py b/pywizlight/tests/test_bulb.py index b914157..1418394 100644 --- a/pywizlight/tests/test_bulb.py +++ b/pywizlight/tests/test_bulb.py @@ -23,6 +23,28 @@ async def correct_bulb() -> AsyncGenerator[wizlight, None]: shutdown() +@pytest.fixture() +async def power_socket_old_firmware() -> AsyncGenerator[wizlight, None]: + shutdown, port = await startup_bulb( + module_name="ESP25_SOCKET_01", firmware_version="1.26.1" + ) + bulb = wizlight(ip="127.0.0.1", port=port) + yield bulb + await bulb.async_close() + shutdown() + + +@pytest.fixture() +async def power_socket() -> AsyncGenerator[wizlight, None]: + shutdown, port = await startup_bulb( + module_name="ESP25_SOCKET_01", firmware_version="1.26.2" + ) + bulb = wizlight(ip="127.0.0.1", port=port) + yield bulb + await bulb.async_close() + shutdown() + + @pytest.fixture() async def bad_bulb() -> AsyncGenerator[wizlight, None]: bulb = wizlight(ip="1.1.1.1") @@ -268,6 +290,27 @@ async def test_get_mac(correct_bulb: wizlight) -> None: assert mac == "a8bb5006033d" +@pytest.mark.asyncio +async def test_get_power(power_socket: wizlight) -> None: + """Test getting power in watts.""" + watts = await power_socket.get_power() + assert watts == 1065.385 + + +@pytest.mark.asyncio +async def test_get_power_old_firmware(power_socket_old_firmware: wizlight) -> None: + """Test getting power in watts.""" + watts = await power_socket_old_firmware.get_power() + assert watts is None + + +@pytest.mark.asyncio +async def test_get_power_unsupported_device(correct_bulb: wizlight) -> None: + """Test getting power in watts.""" + assert await correct_bulb.get_power() is None + assert await correct_bulb.get_power() is None + + # Error states / Timout @pytest.mark.asyncio async def test_timeout(bad_bulb: wizlight) -> None: diff --git a/pywizlight/tests/test_push_manager.py b/pywizlight/tests/test_push_manager.py index c4a1a3f..6b28fc9 100644 --- a/pywizlight/tests/test_push_manager.py +++ b/pywizlight/tests/test_push_manager.py @@ -104,6 +104,7 @@ def _on_push(data: PilotParser) -> None: "src": "hb", "mqttCd": 255, "ts": 1644593327, + "pc": 660, "state": False, "sceneId": 0, "temp": 6500, @@ -125,6 +126,7 @@ def _on_push(data: PilotParser) -> None: update = await socket_push.updateState() assert update is not None assert update.pilotResult == params + assert await socket_push.get_power() == 0.660 diagnostics = socket_push.diagnostics assert diagnostics["bulb_type"]["bulb_type"] == "SOCKET"