Skip to content

Commit

Permalink
Add Xiaomi E1 driver curtain support (#2629)
Browse files Browse the repository at this point in the history
* Add quirk for Xiaomi E1 driver curtain

* Add tests for Xiaomi E1 driver curtain

* Fix inverted logic

* Add test for inverted logic

* Comment out inverting percentage

Apparently this is already done by the cover when "inverted mode" is enabled. Does this depend on firmware?

* Add more custom attributes

* Add illuminance measurement

Possible values are 0, 1, 2 apparently

* Add custom node descriptor for disabling "is mains powered"

* Move imports

* Comment out test for reverting lift percentage

We're not inverting it ourselves at the moment

* Make illuminance cluster a `LocalDataCluster`

* Remove "TODO: `XiaomiAqaraDriverE1` in OUTPUT clusters?"

Doesn't seem to be the case, but I'm not sure.

* Remove commented code about inverting "go to lift percent" command

* Remove commented code about inverting current lift percent attribute

* Remove TODO for up/down commands

* Change "attributes copy" to copy from `XiaomiAqaraE1Cluster` class

* Move local illuminance cluster and custom PowerConfiguration class to main Xiaomi file

They'll also be used by other devices (in future PRs)

* Remove commented test about testing inverted percent attribute

This functionality was removed, as it seems to be wrong according to comments in #2629

* Change comment for `WindowCoveringE1` cluster class

* Add tests for custom light level attribute
  • Loading branch information
TheJulianJES authored Oct 21, 2023
1 parent 32e956c commit b731115
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 1 deletion.
86 changes: 86 additions & 0 deletions tests/test_xiaomi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import zigpy.device
import zigpy.types as t
from zigpy.zcl import foundation
from zigpy.zcl.clusters.closures import WindowCovering
from zigpy.zcl.clusters.general import (
AnalogInput,
DeviceTemperature,
Expand Down Expand Up @@ -51,6 +52,7 @@
XiaomiQuickInitDevice,
handle_quick_init,
)
import zhaquirks.xiaomi.aqara.driver_curtain_e1
from zhaquirks.xiaomi.aqara.feeder_acn001 import (
FEEDER_ATTR,
ZCL_CHILD_LOCK,
Expand Down Expand Up @@ -1362,3 +1364,87 @@ async def test_xiaomi_t1_door_sensor(
assert power_listener.attribute_updates[0][1] == expected_results[0]
assert power_listener.attribute_updates[1][0] == zcl_power_percent_id
assert power_listener.attribute_updates[1][1] == expected_results[1]


@pytest.mark.parametrize(
"command, command_id, value",
[
(
WindowCovering.ServerCommandDefs.up_open.id,
WindowCovering.ServerCommandDefs.go_to_lift_percentage.id,
0,
),
(
WindowCovering.ServerCommandDefs.down_close.id,
WindowCovering.ServerCommandDefs.go_to_lift_percentage.id,
100,
),
(
WindowCovering.ServerCommandDefs.stop.id,
WindowCovering.ServerCommandDefs.stop.id,
None,
),
],
)
async def test_xiaomi_e1_driver_commands(
zigpy_device_from_quirk, command, command_id, value
):
"""Test Aqara E1 driver commands for basic movement functions using WindowCovering cluster."""
device = zigpy_device_from_quirk(zhaquirks.xiaomi.aqara.driver_curtain_e1.DriverE1)

window_covering_cluster = device.endpoints[1].window_covering
p = mock.patch.object(window_covering_cluster, "request", mock.AsyncMock())

with p as request_mock:
request_mock.return_value = (foundation.Status.SUCCESS, "done")

# test command
await window_covering_cluster.command(command)
assert request_mock.call_count == 1
assert request_mock.call_args[0][1] == command_id
if value is not None:
assert request_mock.call_args[0][3] == value


@pytest.mark.parametrize(
"device_level, converted_level",
[
(0, 0),
(1, 50),
(2, 100),
],
)
async def test_xiaomi_e1_driver_light_level(
zigpy_device_from_quirk, device_level, converted_level
):
"""Test Aqara E1 driver light level cluster conversion."""
device = zigpy_device_from_quirk(zhaquirks.xiaomi.aqara.driver_curtain_e1.DriverE1)

opple_cluster = device.endpoints[1].in_clusters[XiaomiAqaraE1Cluster.cluster_id]
opple_listener = ClusterListener(opple_cluster)
opple_zcl_iilluminance_id = 0x0429

illuminance_cluster = device.endpoints[1].illuminance
illuminance_listener = ClusterListener(illuminance_cluster)
zcl_iilluminance_id = IlluminanceMeasurement.AttributeDefs.measured_value.id

# send motion and illuminance report 10
opple_cluster.update_attribute(opple_zcl_iilluminance_id, device_level)

# confirm manufacturer specific attribute report
assert len(opple_listener.attribute_updates) == 1
assert opple_listener.attribute_updates[0][0] == opple_zcl_iilluminance_id
assert opple_listener.attribute_updates[0][1] == device_level

# confirm illuminance report (with conversion)
assert len(illuminance_listener.attribute_updates) == 1
assert illuminance_listener.attribute_updates[0][0] == zcl_iilluminance_id

assert (
device_level == 0
and converted_level == 0
or (
illuminance_listener.attribute_updates[0][1]
== 10000 * math.log10(converted_level) + 1
)
)
32 changes: 31 additions & 1 deletion zhaquirks/xiaomi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ class BinaryOutputInterlock(CustomCluster, BinaryOutput):


class XiaomiPowerConfiguration(PowerConfiguration, LocalDataCluster):
"""Xiaomi power configuration cluster implementation."""
"""Xiaomi power configuration cluster implementation used for devices that only send battery voltage."""

BATTERY_VOLTAGE_ATTR = PowerConfiguration.AttributeDefs.battery_voltage.id
BATTERY_PERCENTAGE_REMAINING = (
Expand Down Expand Up @@ -501,6 +501,23 @@ def _update_battery_percentage(self, voltage_mv: int) -> None:
self._update_attribute(self.BATTERY_PERCENTAGE_REMAINING, percent)


class XiaomiPowerConfigurationPercent(XiaomiPowerConfiguration):
"""Power cluster which ignores Xiaomi voltage reports for calculating battery percentage
Devices that use this cluster (E1 curtain driver/roller) already send the battery percentage on their own
as a separate attribute, but additionally also send the battery voltage.
This class only uses the voltage reports for the voltage attribute, but not for the battery percentage.
The battery percentage is used as is from the battery percentage reports using inherited battery_percent_reported().
"""

def _update_battery_percentage(self, voltage_mv: int) -> None:
"""Ignore Xiaomi voltage reports, so they're not used to calculate battery percentage."""
# This device sends battery percentage reports which are handled using a XiaomiCluster and
# the inherited XiaomiPowerConfiguration cluster.
# This device might also send Xiaomi battery reports, so we only want to use those for the voltage attribute,
# but not for the battery percentage. XiaomiPowerConfiguration.battery_reported() still updates the voltage.


class OccupancyCluster(OccupancyWithReset):
"""Occupancy cluster."""

Expand Down Expand Up @@ -625,6 +642,19 @@ def _update_attribute(self, attrid, value):
super()._update_attribute(attrid, value)


class LocalIlluminanceMeasurementCluster(
LocalDataCluster, IlluminanceMeasurementCluster
):
"""Illuminance measurement cluster based on LocalDataCluster."""

def __init__(self, *args, **kwargs):
"""Init."""
super().__init__(*args, **kwargs)
if self.AttributeDefs.measured_value.id not in self._attr_cache:
# put a default value so the sensor is created
self._update_attribute(self.AttributeDefs.measured_value.id, 0)


class OnOffCluster(OnOff, CustomCluster):
"""Aqara wall switch cluster."""

Expand Down
151 changes: 151 additions & 0 deletions zhaquirks/xiaomi/aqara/driver_curtain_e1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""Aqara Curtain Driver E1 device."""
from __future__ import annotations

from typing import Any

from zigpy import types as t
from zigpy.profiles import zha
from zigpy.zcl import foundation
from zigpy.zcl.clusters.closures import WindowCovering
from zigpy.zcl.clusters.general import Basic, Identify, Ota, PowerConfiguration, Time
from zigpy.zcl.clusters.measurement import IlluminanceMeasurement
from zigpy.zdo.types import NodeDescriptor

from zhaquirks import CustomCluster
from zhaquirks.const import (
DEVICE_TYPE,
ENDPOINTS,
INPUT_CLUSTERS,
MODELS_INFO,
NODE_DESCRIPTOR,
OUTPUT_CLUSTERS,
PROFILE_ID,
)
from zhaquirks.xiaomi import (
LUMI,
BasicCluster,
LocalIlluminanceMeasurementCluster,
XiaomiAqaraE1Cluster,
XiaomiCustomDevice,
XiaomiPowerConfigurationPercent,
)

HAND_OPEN = 0x0401
POSITIONS_STORED = 0x0402
STORE_POSITION = 0x0407
HOOKS_LOCK = 0x0427
HOOKS_STATE = 0x0428
LIGHT_LEVEL = 0x0429


class XiaomiAqaraDriverE1(XiaomiAqaraE1Cluster):
"""Xiaomi Aqara Curtain Driver E1 cluster."""

attributes = XiaomiAqaraE1Cluster.attributes.copy()
attributes.update(
{
HAND_OPEN: ("hand_open", t.Bool, True),
POSITIONS_STORED: ("positions_stored", t.Bool, True),
STORE_POSITION: ("store_position", t.uint8_t, True),
HOOKS_LOCK: ("hooks_lock", t.uint8_t, True),
HOOKS_STATE: ("hooks_state", t.uint8_t, True),
LIGHT_LEVEL: ("light_level", t.uint8_t, True),
}
)

def _update_attribute(self, attrid, value):
if attrid == LIGHT_LEVEL:
# Light level value seems like it can be 0, 1, or 2.
# Multiply by 50 to map those values to later show: 1 lx, 50 lx, 100 lx.
self.endpoint.illuminance.update_attribute(
IlluminanceMeasurement.AttributeDefs.measured_value.id,
value * 50,
)
super()._update_attribute(attrid, value)


class WindowCoveringE1(CustomCluster, WindowCovering):
"""Xiaomi Window Covering cluster that maps open/close to lift percentage."""

async def command(
self,
command_id: foundation.GeneralCommand | int | t.uint8_t,
*args: Any,
manufacturer: int | t.uint16_t | None = None,
expect_reply: bool = True,
tsn: int | t.uint8_t | None = None,
**kwargs: Any,
) -> Any:
"""Overwrite the open/close commands to call the lift percentage command instead."""
if command_id == WindowCovering.ServerCommandDefs.up_open.id:
command_id = WindowCovering.ServerCommandDefs.go_to_lift_percentage.id
args = (0,)
elif command_id == WindowCovering.ServerCommandDefs.down_close.id:
command_id = WindowCovering.ServerCommandDefs.go_to_lift_percentage.id
args = (100,)

return await super().command(
command_id,
*args,
manufacturer=manufacturer,
expect_reply=expect_reply,
tsn=tsn,
**kwargs,
)


class DriverE1(XiaomiCustomDevice):
"""Aqara Curtain Driver E1 device."""

signature = {
MODELS_INFO: [(LUMI, "lumi.curtain.agl001")],
ENDPOINTS: {
# <SizePrefixedSimpleDescriptor endpoint=1 profile=260 device_type=263
# device_version=1
# input_clusters=[0, 1, 3, 10, 258, 64704]
# output_clusters=[3, 10, 25, 64704]>
1: {
PROFILE_ID: zha.PROFILE_ID,
DEVICE_TYPE: zha.DeviceType.OCCUPANCY_SENSOR,
INPUT_CLUSTERS: [
Basic.cluster_id,
PowerConfiguration.cluster_id,
Identify.cluster_id,
Time.cluster_id,
WindowCovering.cluster_id,
XiaomiAqaraDriverE1.cluster_id,
],
OUTPUT_CLUSTERS: [
Identify.cluster_id,
Time.cluster_id,
Ota.cluster_id,
XiaomiAqaraDriverE1.cluster_id,
],
}
},
}
replacement = {
NODE_DESCRIPTOR: NodeDescriptor(
0x02, 0x40, 0x80, 0x115F, 0x7F, 0x0064, 0x2C00, 0x0064, 0x00
),
ENDPOINTS: {
1: {
PROFILE_ID: zha.PROFILE_ID,
DEVICE_TYPE: zha.DeviceType.WINDOW_COVERING_DEVICE,
INPUT_CLUSTERS: [
BasicCluster,
XiaomiPowerConfigurationPercent,
Identify.cluster_id,
Time.cluster_id,
WindowCoveringE1,
LocalIlluminanceMeasurementCluster,
XiaomiAqaraDriverE1,
],
OUTPUT_CLUSTERS: [
Identify.cluster_id,
Time.cluster_id,
Ota.cluster_id,
],
}
},
}

0 comments on commit b731115

Please sign in to comment.