diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3c815bb..08bb17b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,12 +18,20 @@ repos: - --quiet - --safe - - repo: https://github.com/pycqa/flake8 + - repo: https://github.com/PyCQA/flake8 rev: 6.0.0 hooks: - id: flake8 additional_dependencies: + - flake8-docstrings==1.5.0 + - pydocstyle==5.1.1 - Flake8-pyproject==1.2.3 + - flake8-bugbear==23.1.20 + - flake8-comprehensions==3.10.1 + - flake8_2020==1.7.0 + - mccabe==0.7.0 + - pycodestyle==2.10.0 + - pyflakes==3.0.1 - repo: https://github.com/PyCQA/isort rev: 5.12.0 diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..4fd57f3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for zigpy_xbee.""" diff --git a/tests/test_api.py b/tests/test_api.py index 70490b6..ab3342c 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,3 +1,5 @@ +"""Tests for API.""" + import asyncio import logging @@ -19,18 +21,21 @@ @pytest.fixture def api(): + """Sample XBee API fixture.""" api = xbee_api.XBee(DEVICE_CONFIG) api._uart = mock.MagicMock() return api async def test_connect(monkeypatch): + """Test connect.""" api = xbee_api.XBee(DEVICE_CONFIG) monkeypatch.setattr(uart, "connect", mock.AsyncMock()) await api.connect() def test_close(api): + """Test connection close.""" uart = api._uart conn_lost_task = mock.MagicMock() api._conn_lost_task = conn_lost_task @@ -44,6 +49,7 @@ def test_close(api): def test_commands(): + """Test command requests and command responses description.""" import string anum = string.ascii_letters + string.digits + "_" @@ -60,6 +66,8 @@ def test_commands(): async def test_command(api): + """Test AT commands.""" + def mock_api_frame(name, *args): c = xbee_api.COMMAND_REQUESTS[name] return mock.sentinel.api_frame_data, c[2] @@ -97,6 +105,7 @@ def mock_api_frame(name, *args): async def test_command_not_connected(api): + """Test AT command while disconnected to the device.""" api._uart = None def mock_api_frame(name, *args): @@ -104,7 +113,7 @@ def mock_api_frame(name, *args): api._api_frame = mock.MagicMock(side_effect=mock_api_frame) - for cmd, cmd_opts in xbee_api.COMMAND_REQUESTS.items(): + for cmd, _cmd_opts in xbee_api.COMMAND_REQUESTS.items(): with pytest.raises(zigpy.exceptions.APIException): await api._command(cmd, mock.sentinel.cmd_data) assert api._api_frame.call_count == 0 @@ -115,6 +124,7 @@ async def _test_at_or_queued_at_command(api, cmd, monkeypatch, do_reply=True): monkeypatch.setattr( t, "serialize", mock.MagicMock(return_value=mock.sentinel.serialize) ) + """Call api._at_command() or api._queued_at() with every possible command.""" def mock_command(name, *args): rsp = xbee_api.COMMAND_REQUESTS[name][2] @@ -141,10 +151,12 @@ def mock_command(name, *args): async def test_at_command(api, monkeypatch): + """Test api._at_command.""" await _test_at_or_queued_at_command(api, api._at_command, monkeypatch) async def test_at_command_no_response(api, monkeypatch): + """Test api._at_command with no response.""" with pytest.raises(asyncio.TimeoutError): await _test_at_or_queued_at_command( api, api._at_command, monkeypatch, do_reply=False @@ -152,6 +164,7 @@ async def test_at_command_no_response(api, monkeypatch): async def test_queued_at_command(api, monkeypatch): + """Test api._queued_at.""" await _test_at_or_queued_at_command(api, api._queued_at, monkeypatch) @@ -159,6 +172,7 @@ async def _test_remote_at_command(api, monkeypatch, do_reply=True): monkeypatch.setattr( t, "serialize", mock.MagicMock(return_value=mock.sentinel.serialize) ) + """Call api._remote_at_command().""" def mock_command(name, *args): rsp = xbee_api.COMMAND_REQUESTS[name][2] @@ -194,16 +208,19 @@ def mock_command(name, *args): async def test_remote_at_cmd(api, monkeypatch): + """Test remote AT command.""" await _test_remote_at_command(api, monkeypatch) async def test_remote_at_cmd_no_rsp(api, monkeypatch): + """Test remote AT command with no response.""" monkeypatch.setattr(xbee_api, "REMOTE_AT_COMMAND_TIMEOUT", 0.1) with pytest.raises(asyncio.TimeoutError): await _test_remote_at_command(api, monkeypatch, do_reply=False) def test_api_frame(api): + """Test api._api_frame.""" ieee = t.EUI64([t.uint8_t(a) for a in range(0, 8)]) for cmd_name, cmd_opts in xbee_api.COMMAND_REQUESTS.items(): cmd_id, schema, repl = cmd_opts @@ -215,6 +232,7 @@ def test_api_frame(api): def test_frame_received(api, monkeypatch): + """Test api.frame_received().""" monkeypatch.setattr( t, "deserialize", @@ -255,6 +273,7 @@ def test_frame_received(api, monkeypatch): def test_frame_received_no_handler(api, monkeypatch): + """Test frame received with no handler defined.""" monkeypatch.setattr( t, "deserialize", mock.MagicMock(return_value=(b"deserialized data", b"")) ) @@ -275,6 +294,7 @@ def test_frame_received_no_handler(api, monkeypatch): def _handle_at_response(api, tsn, status, at_response=b""): + """Call api._handle_at_response.""" data = (tsn, b"AI", status, at_response) response = asyncio.Future() api._awaiting[tsn] = (response,) @@ -283,6 +303,7 @@ def _handle_at_response(api, tsn, status, at_response=b""): def test_handle_at_response_none(api): + """Test AT successful response with no value.""" tsn = 123 fut = _handle_at_response(api, tsn, 0) assert fut.done() is True @@ -291,6 +312,7 @@ def test_handle_at_response_none(api): def test_handle_at_response_data(api): + """Test AT successful response with data.""" tsn = 123 status, response = 0, 0x23 fut = _handle_at_response(api, tsn, status, [response]) @@ -300,6 +322,7 @@ def test_handle_at_response_data(api): def test_handle_at_response_error(api): + """Test AT unsuccessful response.""" tsn = 123 status, response = 1, 0x23 fut = _handle_at_response(api, tsn, status, [response]) @@ -308,6 +331,7 @@ def test_handle_at_response_error(api): def test_handle_at_response_undef_error(api): + """Test AT unsuccessful response with undefined error.""" tsn = 123 status, response = 0xEE, 0x23 fut = _handle_at_response(api, tsn, status, [response]) @@ -316,6 +340,7 @@ def test_handle_at_response_undef_error(api): def test_handle_remote_at_rsp(api): + """Test handling the response.""" api._handle_at_response = mock.MagicMock() s = mock.sentinel api._handle_remote_at_response(s.frame_id, s.ieee, s.nwk, s.cmd, s.status, s.data) @@ -327,6 +352,7 @@ def test_handle_remote_at_rsp(api): def _send_modem_event(api, event): + """Call api._handle_modem_status().""" api._app = mock.MagicMock(spec=ControllerApplication) api._handle_modem_status(event) assert api._app.handle_modem_status.call_count == 1 @@ -334,6 +360,7 @@ def _send_modem_event(api, event): def test_handle_modem_status(api): + """Test api._handle_modem_status().""" api._running.clear() api._reset.set() _send_modem_event(api, xbee_t.ModemStatus.COORDINATOR_STARTED) @@ -354,6 +381,7 @@ def test_handle_modem_status(api): def test_handle_explicit_rx_indicator(api): + """Test receiving explicit_rx_indicator frame.""" s = mock.sentinel data = [ s.src_ieee, @@ -372,6 +400,7 @@ def test_handle_explicit_rx_indicator(api): def _handle_tx_status(api, status, wrong_frame_id=False): + """Call api._handle_tx_status.""" status = xbee_t.TXStatus(status) frame_id = 0x12 send_fut = mock.MagicMock(spec=asyncio.Future) @@ -386,6 +415,7 @@ def _handle_tx_status(api, status, wrong_frame_id=False): def test_handle_tx_status_success(api): + """Test handling successful TX Status.""" fut = _handle_tx_status(api, xbee_t.TXStatus.SUCCESS) assert len(api._awaiting) == 0 assert fut.set_result.call_count == 1 @@ -393,6 +423,7 @@ def test_handle_tx_status_success(api): def test_handle_tx_status_except(api): + """Test exceptional TXStatus.""" fut = _handle_tx_status(api, xbee_t.TXStatus.ADDRESS_NOT_FOUND) assert len(api._awaiting) == 0 assert fut.set_result.call_count == 0 @@ -400,6 +431,7 @@ def test_handle_tx_status_except(api): def test_handle_tx_status_unexpected(api): + """Test TX status reply on unexpected frame.""" fut = _handle_tx_status(api, 1, wrong_frame_id=True) assert len(api._awaiting) == 1 assert fut.set_result.call_count == 0 @@ -407,6 +439,7 @@ def test_handle_tx_status_unexpected(api): def test_handle_tx_status_duplicate(api): + """Test TX status duplicate reply.""" status = xbee_t.TXStatus.SUCCESS frame_id = 0x12 send_fut = mock.MagicMock(spec=asyncio.Future) @@ -420,6 +453,7 @@ def test_handle_tx_status_duplicate(api): def test_handle_registration_status(api): + """Test device registration status.""" frame_id = 0x12 status = xbee_t.RegistrationStatus.SUCCESS fut = asyncio.Future() @@ -440,6 +474,7 @@ def test_handle_registration_status(api): async def test_command_mode_at_cmd(api): + """Test AT in command mode.""" command = "+++" def cmd_mode_send(cmd): @@ -452,6 +487,7 @@ def cmd_mode_send(cmd): async def test_command_mode_at_cmd_timeout(api): + """Test AT in command mode with timeout.""" command = "+++" api._uart.command_mode_send = mock.MagicMock() @@ -461,6 +497,7 @@ async def test_command_mode_at_cmd_timeout(api): def test_handle_command_mode_rsp(api): + """Test command mode response.""" api._cmd_mode_future = None data = "OK" api.handle_command_mode_rsp(data) @@ -483,6 +520,7 @@ def test_handle_command_mode_rsp(api): async def test_enter_at_command_mode(api): + """Test switching to command mode.""" api.command_mode_at_cmd = mock.AsyncMock(return_value=mock.sentinel.at_response) res = await api.enter_at_command_mode() @@ -490,6 +528,7 @@ async def test_enter_at_command_mode(api): async def test_api_mode_at_commands(api): + """Test AT in API mode.""" api.command_mode_at_cmd = mock.AsyncMock(return_value=mock.sentinel.api_mode) res = await api.api_mode_at_commands(57600) @@ -506,6 +545,7 @@ async def mock_at_cmd(cmd): async def test_init_api_mode(api, monkeypatch): + """Test init the API mode.""" monkeypatch.setattr(api._uart, "baudrate", 57600) api.enter_at_command_mode = mock.AsyncMock(return_value=True) @@ -534,22 +574,26 @@ async def enter_at_mode(): def test_set_application(api): + """Test setting the application.""" api.set_application(mock.sentinel.app) assert api._app == mock.sentinel.app def test_handle_route_record_indicator(api): + """Test api._handle_route_record_indicator().""" s = mock.sentinel api._handle_route_record_indicator(s.ieee, s.src, s.rx_opts, s.hops) def test_handle_many_to_one_rri(api): + """Test api._handle_many_to_one_rri().""" ieee = t.EUI64([t.uint8_t(a) for a in range(0, 8)]) nwk = 0x1234 api._handle_many_to_one_rri(ieee, nwk, 0) async def test_reconnect_multiple_disconnects(monkeypatch, caplog): + """Test reconnect with multiple disconnects.""" api = xbee_api.XBee(DEVICE_CONFIG) connect_mock = mock.AsyncMock(return_value=True) monkeypatch.setattr(uart, "connect", connect_mock) @@ -570,6 +614,7 @@ async def test_reconnect_multiple_disconnects(monkeypatch, caplog): async def test_reconnect_multiple_attempts(monkeypatch, caplog): + """Test reconnect with multiple attempts.""" api = xbee_api.XBee(DEVICE_CONFIG) connect_mock = mock.AsyncMock(return_value=True) monkeypatch.setattr(uart, "connect", connect_mock) diff --git a/tests/test_application.py b/tests/test_application.py index 065f0ce..6b4359f 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1,3 +1,5 @@ +"""Tests for ControllerApplication.""" + import asyncio import pytest @@ -25,6 +27,7 @@ @pytest.fixture def node_info(): + """Sample NodeInfo fixture.""" return zigpy.state.NodeInfo( nwk=t.NWK(0x0000), ieee=t.EUI64.convert("00:12:4b:00:1c:a1:b8:46"), @@ -34,6 +37,7 @@ def node_info(): @pytest.fixture def network_info(node_info): + """Sample NetworkInfo fixture.""" return zigpy.state.NetworkInfo( extended_pan_id=t.ExtendedPanId.convert("bd:27:0b:38:37:95:dc:87"), pan_id=t.PanId(0x9BB0), @@ -61,6 +65,7 @@ def network_info(node_info): @pytest.fixture def app(monkeypatch): + """Sample ControllerApplication fixture.""" monkeypatch.setattr(application, "TIMEOUT_TX_STATUS", 0.1) monkeypatch.setattr(application, "TIMEOUT_REPLY", 0.1) monkeypatch.setattr(application, "TIMEOUT_REPLY_EXTENDED", 0.1) @@ -75,6 +80,7 @@ def app(monkeypatch): def test_modem_status(app): + """Test handling ModemStatus updates.""" assert 0x00 in xbee_t.ModemStatus.__members__.values() app.handle_modem_status(xbee_t.ModemStatus(0x00)) assert 0xEE not in xbee_t.ModemStatus.__members__.values() @@ -89,6 +95,7 @@ def _test_rx( cluster_id=mock.sentinel.cluster_id, data=mock.sentinel.data, ): + """Call app.handle_rx().""" app.get_device = mock.MagicMock(return_value=device) app.handle_rx( @@ -104,6 +111,7 @@ def _test_rx( def test_rx(app): + """Test message receiving.""" device = mock.MagicMock() app.handle_message = mock.MagicMock() _test_rx(app, device, 0x1234, data=b"\x01\x02\x03\x04") @@ -119,6 +127,7 @@ def test_rx(app): def test_rx_nwk_0000(app): + """Test receiving self-addressed message.""" app._handle_reply = mock.MagicMock() app.handle_message = mock.MagicMock() app.get_device = mock.MagicMock() @@ -185,7 +194,11 @@ def test_rx_unknown_device_ieee(app): @pytest.fixture def device(app): - def _device(new=False, zdo_init=False, nwk=t.uint16_t(0x1234)): + """Sample zigpee.Device fixture.""" + + nwk = t.uint16_t(0x1234) + + def _device(new=False, zdo_init=False, nwk=nwk): from zigpy.device import Device, Status as DeviceStatus ieee, _ = t.EUI64.deserialize(b"\x08\x07\x06\x05\x04\x03\x02\x01") @@ -202,6 +215,7 @@ def _device(new=False, zdo_init=False, nwk=t.uint16_t(0x1234)): def _device_join(app, dev, data): + """Simulate device join notification.""" app.handle_message = mock.MagicMock() app.handle_join = mock.MagicMock() app.create_task = mock.MagicMock() @@ -216,6 +230,7 @@ def _device_join(app, dev, data): def test_device_join_new(app, device): + """Test device join.""" dev = device() data = b"\xee" + dev.nwk.serialize() + dev.ieee.serialize() + b"\x40" @@ -223,6 +238,7 @@ def test_device_join_new(app, device): def test_device_join_inconsistent_nwk(app, device): + """Test device join inconsistent NWK.""" dev = device() data = b"\xee" + b"\x01\x02" + dev.ieee.serialize() + b"\x40" @@ -230,6 +246,7 @@ def test_device_join_inconsistent_nwk(app, device): def test_device_join_inconsistent_ieee(app, device): + """Test device join inconsistent IEEE.""" dev = device() data = b"\xee" + dev.nwk.serialize() + b"\x01\x02\x03\x04\x05\x06\x07\x08" + b"\x40" @@ -237,6 +254,7 @@ def test_device_join_inconsistent_ieee(app, device): async def test_broadcast(app): + """Test sending broadcast transmission.""" (profile, cluster, src_ep, dst_ep, grpid, radius, tsn, data) = ( 0x260, 1, @@ -274,6 +292,7 @@ async def test_broadcast(app): async def test_get_association_state(app): + """Test get association statevia API.""" ai_results = (0xFF, 0xFF, 0xFF, 0xFF, mock.sentinel.ai) app._api._at_command = mock.AsyncMock( spec=XBee._at_command, @@ -286,6 +305,8 @@ async def test_get_association_state(app): @pytest.mark.parametrize("legacy_module", (False, True)) async def test_write_network_info(app, node_info, network_info, legacy_module): + """Test writing network info to the device.""" + def _mock_queued_at(name, *args): if legacy_module and name == "CE": raise RuntimeError("Legacy module") @@ -320,6 +341,7 @@ async def _test_start_network( zs=2, legacy_module=False, ): + """Call app.start_network().""" ai_tries = 5 app.state.node_info = zigpy.state.NodeInfo() @@ -366,6 +388,7 @@ def init_api_mode_mock(): async def test_start_network(app): + """Test start network.""" await _test_start_network(app, ai_status=0x00) assert app.state.node_info.nwk == 0x0000 assert app.state.node_info.ieee == t.EUI64(range(1, 9)) @@ -393,6 +416,7 @@ async def test_start_network(app): async def test_start_network_no_api_mode(app): + """Test start network when not in API mode.""" await _test_start_network(app, ai_status=0x00, api_mode=False) assert app.state.node_info.nwk == 0x0000 assert app.state.node_info.ieee == t.EUI64(range(1, 9)) @@ -401,6 +425,7 @@ async def test_start_network_no_api_mode(app): async def test_start_network_api_mode_config_fails(app): + """Test start network when not when API config fails.""" with pytest.raises(zigpy.exceptions.ControllerException): await _test_start_network( app, ai_status=0x00, api_mode=False, api_config_succeeds=False @@ -411,6 +436,7 @@ async def test_start_network_api_mode_config_fails(app): async def test_permit(app): + """Test permit joins.""" app._api._at_command = mock.AsyncMock() time_s = 30 await app.permit_ncp(time_s) @@ -419,6 +445,7 @@ async def test_permit(app): async def test_permit_with_key(app): + """Test permit joins with join code.""" app._api._command = mock.AsyncMock(return_value=xbee_t.TXStatus.SUCCESS) app._api._at_command = mock.AsyncMock(return_value="OK") node = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08") @@ -432,6 +459,7 @@ async def test_permit_with_key(app): async def test_permit_with_link_key(app): + """Test permit joins with link key.""" app._api._command = mock.AsyncMock(return_value=xbee_t.TXStatus.SUCCESS) app._api._at_command = mock.AsyncMock(return_value="OK") node = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08") @@ -447,6 +475,7 @@ async def test_permit_with_link_key(app): async def _test_request( app, expect_reply=True, send_success=True, send_timeout=False, **kwargs ): + """Call app.request().""" seq = 123 nwk = 0x2345 ieee = t.EUI64(b"\x01\x02\x03\x04\x05\x06\x07\x08") @@ -478,26 +507,31 @@ def _mock_command( async def test_request_with_ieee(app): + """Test request with IEEE.""" r = await _test_request(app, use_ieee=True, send_success=True) assert r[0] == 0 async def test_request_with_reply(app): + """Test request with expecting reply.""" r = await _test_request(app, expect_reply=True, send_success=True) assert r[0] == 0 async def test_request_send_timeout(app): + """Test request with send timeout.""" with pytest.raises(zigpy.exceptions.DeliveryError): await _test_request(app, send_timeout=True) async def test_request_send_fail(app): + """Test request with send failure.""" with pytest.raises(zigpy.exceptions.DeliveryError): await _test_request(app, send_success=False) async def test_request_unknown_device(app): + """Test request with unknown device.""" dev = zigpy.device.Device( application=app, ieee=xbee_t.UNKNOWN_IEEE, nwk=xbee_t.UNKNOWN_NWK ) @@ -517,6 +551,7 @@ async def test_request_unknown_device(app): async def test_request_extended_timeout(app): + """Test request with extended timeout.""" r = await _test_request(app, True, True, extended_timeout=False) assert r[0] == xbee_t.TXStatus.SUCCESS assert app._api._command.call_count == 1 @@ -531,10 +566,12 @@ async def test_request_extended_timeout(app): async def test_force_remove(app): + """Test device force removal.""" await app.force_remove(mock.sentinel.device) async def test_shutdown(app): + """Test application shutdown.""" mack_close = mock.MagicMock() app._api.close = mack_close await app.shutdown() @@ -543,6 +580,7 @@ async def test_shutdown(app): async def test_remote_at_cmd(app, device): + """Test remote AT command.""" dev = device() app.get_device = mock.MagicMock(return_value=dev) app._api = mock.MagicMock(spec=XBee) @@ -560,15 +598,18 @@ async def test_remote_at_cmd(app, device): @pytest.fixture def ieee(): + """Sample IEEE fixture.""" return t.EUI64.deserialize(b"\x00\x01\x02\x03\x04\x05\x06\x07")[0] @pytest.fixture def nwk(): + """Sample NWK fixture.""" return t.uint16_t(0x0100) def test_rx_device_annce(app, ieee, nwk): + """Test receiving device announce.""" dst_ep = 0 cluster_id = zdo_t.ZDOCmd.Device_annce device = mock.MagicMock() @@ -599,6 +640,7 @@ def test_rx_device_annce(app, ieee, nwk): async def _test_mrequest(app, send_success=True, send_timeout=False, **kwargs): + """Call app.mrequest().""" seq = 123 group_id = 0x2345 @@ -618,21 +660,26 @@ def _mock_command( async def test_mrequest_with_reply(app): + """Test mrequest with reply.""" r = await _test_mrequest(app, send_success=True) assert r[0] == 0 async def test_mrequest_send_timeout(app): + """Test mrequest with send timeout.""" with pytest.raises(zigpy.exceptions.DeliveryError): await _test_mrequest(app, send_timeout=True) async def test_mrequest_send_fail(app): + """Test mrequest with send failure.""" with pytest.raises(zigpy.exceptions.DeliveryError): await _test_mrequest(app, send_success=False) async def test_reset_network_info(app): + """Test resetting network.""" + async def mock_at_command(cmd, *args): if cmd == "NR": return 0x00 @@ -649,6 +696,7 @@ async def mock_at_command(cmd, *args): async def test_move_network_to_channel(app): + """Test moving network to another channel.""" app._api._queued_at = mock.AsyncMock(spec=XBee._at_command) await app._move_network_to_channel(26, new_nwk_update_id=1) @@ -657,12 +705,13 @@ async def test_move_network_to_channel(app): async def test_energy_scan(app): + """Test channel energy scan.""" rssi = b"\x0A\x0F\x14\x19\x1E\x23\x28\x2D\x32\x37\x3C\x41\x46\x4B\x50\x55" app._api._at_command = mock.AsyncMock(spec=XBee._at_command, return_value=rssi) time_s = 3 count = 3 energy = await app.energy_scan( - channels=[x for x in range(11, 27)], duration_exp=time_s, count=count + channels=list(range(11, 27)), duration_exp=time_s, count=count ) assert app._api._at_command.mock_calls == [mock.call("ED", time_s)] * count assert {k: round(v, 3) for k, v in energy.items()} == { diff --git a/tests/test_types.py b/tests/test_types.py index 8e3e949..9894184 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,3 +1,5 @@ +"""Tests for types module.""" + import pytest import zigpy.types as t @@ -5,18 +7,21 @@ def test_bytes_serialize(): + """Test Bytes.serialize().""" data = 0x89AB.to_bytes(4, "big") result = xbee_t.Bytes(data).serialize() assert result == data def test_bytes_deserialize(): + """Test Bytes.deserialize().""" data, rest = xbee_t.Bytes.deserialize(0x89AB.to_bytes(3, "big")) assert data == b"\x00\x89\xAB" assert rest == b"" def test_atcommand(): + """Test ATCommand class.""" cmd = b"AI" data = 0x06.to_bytes(4, "big") r_cmd, r_data = xbee_t.ATCommand.deserialize(cmd + data) @@ -26,6 +31,8 @@ def test_atcommand(): def test_undefined_enum_undefined_value(): + """Test UndefinedEnum class.""" + class undEnum(t.uint8_t, xbee_t.UndefinedEnum): OK = 0 ERROR = 2 @@ -48,6 +55,8 @@ class undEnum(t.uint8_t, xbee_t.UndefinedEnum): def test_undefined_enum_undefinede(): + """Test UndefinedEnum undefined member.""" + class undEnum(t.uint8_t, xbee_t.UndefinedEnum): OK = 0 ERROR = 2 @@ -58,6 +67,7 @@ class undEnum(t.uint8_t, xbee_t.UndefinedEnum): def test_nwk(): + """Test NWK class.""" nwk = xbee_t.NWK(0x1234) assert str(nwk) == "0x1234" @@ -65,6 +75,7 @@ def test_nwk(): def test_eui64(): + """Test EUI64 class.""" extra = b"\xBE\xEF" data = b"01234567" diff --git a/tests/test_uart.py b/tests/test_uart.py index 0c9484a..6d1bc2d 100644 --- a/tests/test_uart.py +++ b/tests/test_uart.py @@ -1,3 +1,5 @@ +"""Tests for uart module.""" + import asyncio from unittest import mock @@ -14,6 +16,7 @@ @pytest.fixture def gw(): + """Gateway fixture.""" gw = uart.Gateway(mock.MagicMock()) gw._transport = mock.MagicMock() gw._transport.serial.BAUDRATES = serial_asyncio.serial.Serial.BAUDRATES @@ -21,17 +24,20 @@ def gw(): def test_baudrate(gw): + """Test setting baudrate.""" gw.baudrate gw.baudrate = 19200 assert gw._transport.serial.baudrate == 19200 def test_baudrate_fail(gw): + """Test setting unexpected baudrate.""" with pytest.raises(ValueError): gw.baudrate = 3333 async def test_connect(monkeypatch): + """Test connecting.""" api = mock.MagicMock() async def mock_conn(loop, protocol_factory, **kwargs): @@ -45,6 +51,7 @@ async def mock_conn(loop, protocol_factory, **kwargs): def test_command_mode_rsp(gw): + """Test command mode response.""" data = b"OK" gw.command_mode_rsp(data) assert gw._api.handle_command_mode_rsp.call_count == 1 @@ -52,17 +59,20 @@ def test_command_mode_rsp(gw): def test_command_mode_send(gw): + """Test command mode request.""" data = b"ATAP2\x0D" gw.command_mode_send(data) gw._transport.write.assert_called_once_with(data) def test_close(gw): + """Test closing connection.""" gw.close() assert gw._transport.close.call_count == 1 def test_data_received_chunk_frame(gw): + """Test receiving frame in parts.""" data = b"~\x00\r\x88\rID\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd" gw.frame_received = mock.MagicMock() gw.data_received(data[:3]) @@ -73,6 +83,7 @@ def test_data_received_chunk_frame(gw): def test_data_received_full_frame(gw): + """Test receiving full frame.""" data = b"~\x00\r\x88\rID\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd" gw.frame_received = mock.MagicMock() gw.data_received(data) @@ -81,6 +92,7 @@ def test_data_received_full_frame(gw): def test_data_received_incomplete_frame(gw): + """Test receiving partial frame.""" data = b"~\x00\x07\x8b\x0e\xff\xfd" gw.frame_received = mock.MagicMock() gw.data_received(data) @@ -88,6 +100,7 @@ def test_data_received_incomplete_frame(gw): def test_data_received_at_response_non_cmd_mode(gw): + """Test command mode response while not in command mode.""" data = b"OK\x0D" gw.frame_received = mock.MagicMock() gw.command_mode_rsp = mock.MagicMock() @@ -97,6 +110,7 @@ def test_data_received_at_response_non_cmd_mode(gw): def test_data_received_at_response_in_cmd_mode(gw): + """Test command mode response in command mode.""" data = b"OK\x0D" gw.frame_received = mock.MagicMock() gw.command_mode_rsp = mock.MagicMock() @@ -112,6 +126,7 @@ def test_data_received_at_response_in_cmd_mode(gw): def test_extract(gw): + """Test handling extra chaining data.""" gw._buffer = b"\x7E\x00\x02\x23\x7D\x31\xCBextra" frame = gw._extract_frame() assert frame == b"\x23\x11" @@ -119,6 +134,7 @@ def test_extract(gw): def test_extract_wrong_checksum(gw): + """Test API frame with wrong checksum and extra data.""" gw._buffer = b"\x7E\x00\x02\x23\x7D\x31\xCEextra" frame = gw._extract_frame() assert frame is None @@ -126,6 +142,7 @@ def test_extract_wrong_checksum(gw): def test_extract_checksum_none(gw): + """Test API frame with no checksum.""" data = b"\x7E\x00\x02\x23\x7D\x31" gw._buffer = data gw._checksum = lambda x: None @@ -135,6 +152,7 @@ def test_extract_checksum_none(gw): def test_extract_frame_len_none(gw): + """Test API frame with no length.""" data = b"\x7E" gw._buffer = data frame = gw._extract_frame() @@ -143,6 +161,7 @@ def test_extract_frame_len_none(gw): def test_extract_frame_no_start(gw): + """Test API frame without frame ID.""" data = b"\x00\x02\x23\x7D\x31" gw._buffer = data frame = gw._extract_frame() @@ -151,6 +170,7 @@ def test_extract_frame_no_start(gw): def test_frame_received(gw): + """Test frame is passed to api.""" data = b"frame" gw.frame_received(data) assert gw._api.frame_received.call_count == 1 @@ -158,12 +178,14 @@ def test_frame_received(gw): def test_send(gw): + """Test data send.""" gw.send(b"\x23\x11") data = b"\x7E\x00\x02\x23\x7D\x31\xCB" gw._transport.write.assert_called_once_with(data) def test_escape(gw): + """Test string escaping.""" data = b"".join( [ a.to_bytes(1, "big") + b.to_bytes(1, "big") @@ -178,6 +200,7 @@ def test_escape(gw): def test_unescape(gw): + """Test string unescaping.""" extra = b"\xaa\xbb\xcc\xff" escaped = b'}^"}]3}1D}3U' chk = b"".join( @@ -193,6 +216,7 @@ def test_unescape(gw): def test_unescape_underflow(gw): + """Test unescape with not enough data.""" escaped = b'}^"}' unescaped, rest = gw._get_unescaped(escaped, 3) assert unescaped is None @@ -200,6 +224,7 @@ def test_unescape_underflow(gw): def test_connection_lost_exc(gw): + """Test cannection lost callback is called.""" gw._connected_future = asyncio.Future() gw.connection_lost(ValueError()) @@ -212,6 +237,7 @@ def test_connection_lost_exc(gw): def test_connection_closed(gw): + """Test connection closed.""" gw._connected_future = asyncio.Future() gw.connection_lost(None) diff --git a/zigpy_xbee/__init__.py b/zigpy_xbee/__init__.py index c7c8f20..b9eb72f 100644 --- a/zigpy_xbee/__init__.py +++ b/zigpy_xbee/__init__.py @@ -1,3 +1,5 @@ +"""Init file for zigpy_xbee.""" + MAJOR_VERSION = 0 MINOR_VERSION = 18 PATCH_VERSION = "3" diff --git a/zigpy_xbee/api.py b/zigpy_xbee/api.py index 35ae71d..32c5c81 100644 --- a/zigpy_xbee/api.py +++ b/zigpy_xbee/api.py @@ -1,3 +1,5 @@ +"""XBee API implementation.""" + import asyncio import binascii import enum @@ -260,6 +262,8 @@ class ATCommandResult(enum.IntEnum): + """AT Command Result.""" + OK = 0 ERROR = 1 INVALID_COMMAND = 2 @@ -268,7 +272,10 @@ class ATCommandResult(enum.IntEnum): class XBee: + """Class implementing XBee communication protocol.""" + def __init__(self, device_config: Dict[str, Any]) -> None: + """Initialize instance.""" self._config = device_config self._uart: Optional[uart.Gateway] = None self._seq: int = 1 @@ -301,13 +308,14 @@ async def new( application: "zigpy_xbee.zigbee.application.ControllerApplication", config: Dict[str, Any], ) -> "XBee": - """Create new instance from""" + """Create new instance.""" xbee_api = cls(config) await xbee_api.connect() xbee_api.set_application(application) return xbee_api async def connect(self) -> None: + """Connect to the device.""" assert self._uart is None self._uart = await uart.connect(self._config, self) @@ -364,6 +372,7 @@ async def _reconnect_till_done(self) -> None: ) def close(self): + """Close the connection.""" if self._conn_lost_task: self._conn_lost_task.cancel() self._conn_lost_task = None @@ -373,6 +382,7 @@ def close(self): self._uart = None def _command(self, name, *args, mask_frame_id=False): + """Send API frame to the device.""" LOGGER.debug("Command %s %s", name, args) if self._uart is None: raise APIException("API is not running") @@ -387,6 +397,7 @@ def _command(self, name, *args, mask_frame_id=False): return future async def _remote_at_command(self, ieee, nwk, options, name, *args): + """Execute AT command on a different XBee module in the network.""" LOGGER.debug("Remote AT command: %s %s", name, args) data = t.serialize(args, (AT_COMMANDS[name],)) try: @@ -416,10 +427,12 @@ async def _at_partial(self, cmd_type, name, *args): _queued_at = functools.partialmethod(_at_partial, "queued_at") def _api_frame(self, name, *args): + """Build API frame.""" c = COMMAND_REQUESTS[name] return (bytes([c[0]]) + t.serialize(args, c[1])), c[2] def frame_received(self, data): + """Handle API frame from the device.""" command = self._commands_by_id[data[0]] LOGGER.debug("Frame received: %s", command) data, rest = t.deserialize(data[1:], COMMAND_RESPONSES[command][1]) @@ -429,6 +442,7 @@ def frame_received(self, data): LOGGER.error("No '%s' handler. Data: %s", command, binascii.hexlify(data)) def _handle_at_response(self, frame_id, cmd, status, value): + """Local AT command response.""" (fut,) = self._awaiting.pop(frame_id) try: status = ATCommandResult(status) @@ -528,6 +542,7 @@ def _handle_registration_status(self, frame_id, status): fut.set_result(status) def set_application(self, app): + """Set reference to ControllerApplication.""" self._app = app def handle_command_mode_rsp(self, data): @@ -543,7 +558,7 @@ def handle_command_mode_rsp(self, data): fut.set_result(data) async def command_mode_at_cmd(self, command): - """Sends AT command in command mode.""" + """Send AT command in command mode.""" self._cmd_mode_future = asyncio.Future() self._uart.command_mode_send(command.encode("ascii")) @@ -620,7 +635,7 @@ async def probe(cls, device_config: Dict[str, Any]) -> bool: return False async def _probe(self) -> None: - """Open port and try sending a command""" + """Open port and try sending a command.""" await self.connect() try: # Ensure we have escaped commands @@ -632,6 +647,7 @@ async def _probe(self) -> None: self.close() def __getattr__(self, item): + """Handle supported command requests.""" if item in COMMAND_REQUESTS: return functools.partial(self._command, item) raise AttributeError(f"Unknown command {item}") diff --git a/zigpy_xbee/config.py b/zigpy_xbee/config.py index eb7bf87..385b6a5 100644 --- a/zigpy_xbee/config.py +++ b/zigpy_xbee/config.py @@ -1,3 +1,5 @@ +"""XBee module config.""" + import voluptuous as vol from zigpy.config import ( # noqa: F401 pylint: disable=unused-import CONF_DATABASE, diff --git a/zigpy_xbee/types.py b/zigpy_xbee/types.py index 887337d..dee3894 100644 --- a/zigpy_xbee/types.py +++ b/zigpy_xbee/types.py @@ -1,36 +1,52 @@ +"""Additional types for data parsing.""" + import enum import zigpy.types as t class Bytes(bytes): + """Serializable and deserializable bytes.""" + def serialize(self): + """Serialize the class.""" return self @classmethod def deserialize(cls, data): + """Deserialize the data into the class.""" return cls(data), b"" class ATCommand(Bytes): + """XBee AT command name.""" + @classmethod def deserialize(cls, data): + """Deserialize the data into the class.""" return cls(data[:2]), data[2:] class EUI64(t.EUI64): + """EUI64 without prefix.""" + @classmethod def deserialize(cls, data): + """Deserialize the data into the class.""" r, data = super().deserialize(data) return cls(r[::-1]), data def serialize(self): + """Serialize the class.""" assert self._length == len(self) return super().serialize()[::-1] class UndefinedEnumMeta(enum.EnumMeta): + """Meta class for Enum that always has a value.""" + def __call__(cls, value=None, *args, **kwargs): + """Return the member, default, or undefined value.""" if value is None: # the 1st enum member is default return next(iter(cls)) @@ -45,18 +61,22 @@ def __call__(cls, value=None, *args, **kwargs): class UndefinedEnum(enum.Enum, metaclass=UndefinedEnumMeta): - pass + """Enum that always has a value.""" class FrameId(t.uint8_t): - pass + """API frame ID.""" class NWK(t.uint16_t_be): + """zigpy.types.NWK but big endian.""" + def __repr__(self): + """Get printable representation.""" return f"0x{self:04x}" def __str__(self): + """Get string representation.""" return f"0x{self:04x}" @@ -141,6 +161,8 @@ class DiscoveryStatus(t.uint8_t, UndefinedEnum): class TXOptions(t.bitmap8): + """TX Options for eplicit transmit frame.""" + NONE = 0x00 Disable_Retries_and_Route_Repair = 0x01 @@ -149,6 +171,8 @@ class TXOptions(t.bitmap8): class ModemStatus(t.uint8_t, UndefinedEnum): + """Modem Status.""" + HARDWARE_RESET = 0x00 WATCHDOG_TIMER_RESET = 0x01 JOINED_NETWORK = 0x02 @@ -218,6 +242,8 @@ class ModemStatus(t.uint8_t, UndefinedEnum): class RegistrationStatus(t.uint8_t, UndefinedEnum): + """Key Registration Status.""" + SUCCESS = 0x00 KEY_TOO_LONG = 0x01 TRANSIENT_KEY_TABLE_IS_FULL = 0x18 diff --git a/zigpy_xbee/uart.py b/zigpy_xbee/uart.py index e0c693a..28eed71 100644 --- a/zigpy_xbee/uart.py +++ b/zigpy_xbee/uart.py @@ -1,3 +1,5 @@ +"""Module for UART communication to the device.""" + import asyncio import logging from typing import Any, Dict @@ -10,6 +12,8 @@ class Gateway(asyncio.Protocol): + """Class implementing the UART protocol.""" + START = b"\x7E" ESCAPE = b"\x7D" XON = b"\x11" @@ -19,13 +23,14 @@ class Gateway(asyncio.Protocol): THIS_ONE = True def __init__(self, api, connected_future=None): + """Initialize instance.""" self._buffer = b"" self._connected_future = connected_future self._api = api self._in_command_mode = False def send(self, data): - """Send data, taking care of escaping and framing""" + """Send data, taking care of escaping and framing.""" LOGGER.debug("Sending: %s", data) checksum = bytes([self._checksum(data)]) frame = self.START + self._escape( @@ -63,14 +68,14 @@ def connection_lost(self, exc) -> None: self._api.connection_lost(exc) def connection_made(self, transport): - """Callback when the uart is connected""" + """Handle UART connection callback.""" LOGGER.debug("Connection made") self._transport = transport if self._connected_future: self._connected_future.set_result(True) def command_mode_rsp(self, data): - """Handles AT command mode response.""" + """Handle AT command mode response.""" data = data.decode("ascii", "ignore") LOGGER.debug("Handling AT command mode response: %s", data) self._api.handle_command_mode_rsp(data) @@ -82,7 +87,7 @@ def command_mode_send(self, data): self._transport.write(data) def data_received(self, data): - """Callback when there is data received from the uart""" + """Handle data received from the UART callback.""" self._buffer += data while self._buffer: frame = self._extract_frame() @@ -94,15 +99,16 @@ def data_received(self, data): self.command_mode_rsp(rsp) def frame_received(self, frame): - """Frame receive handler""" + """Frame receive handler.""" LOGGER.debug("Frame received: %s", frame) self._api.frame_received(frame) def close(self): + """Close the connection.""" self._transport.close() def reset_command_mode(self): - """Reset command mode and ignore \r character as command mode response.""" + r"""Reset command mode and ignore '\r' character as command mode response.""" self._in_command_mode = False def _extract_frame(self): @@ -162,6 +168,7 @@ def _checksum(self, data): async def connect(device_config: Dict[str, Any], api, loop=None) -> Gateway: + """Connect to the device.""" if loop is None: loop = asyncio.get_event_loop() diff --git a/zigpy_xbee/zigbee/__init__.py b/zigpy_xbee/zigbee/__init__.py index e69de29..55681c8 100644 --- a/zigpy_xbee/zigbee/__init__.py +++ b/zigpy_xbee/zigbee/__init__.py @@ -0,0 +1 @@ +"""XBee ControllerApplication implementation.""" diff --git a/zigpy_xbee/zigbee/application.py b/zigpy_xbee/zigbee/application.py index 296db74..d67d292 100644 --- a/zigpy_xbee/zigbee/application.py +++ b/zigpy_xbee/zigbee/application.py @@ -1,3 +1,5 @@ +"""ControllerApplication for XBee adapters.""" + from __future__ import annotations import asyncio @@ -37,12 +39,15 @@ class ControllerApplication(zigpy.application.ControllerApplication): + """Implementation of Zigpy ControllerApplication for XBee devices.""" + SCHEMA = CONFIG_SCHEMA SCHEMA_DEVICE = SCHEMA_DEVICE probe = zigpy_xbee.api.XBee.probe def __init__(self, config: dict[str, Any]): + """Initialize instance.""" super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config)) self._api: zigpy_xbee.api.XBee | None = None @@ -53,6 +58,7 @@ async def disconnect(self): self._api = None async def connect(self): + """Connect to the device.""" self._api = await zigpy_xbee.api.XBee.new(self, self._config[CONF_DEVICE]) try: # Ensure we have escaped commands @@ -65,6 +71,7 @@ async def connect(self): ) async def start_network(self): + """Configure the module to work with Zigpy.""" association_state = await asyncio.wait_for( self._get_association_state(), timeout=4 ) @@ -108,6 +115,7 @@ async def start_network(self): await self.register_endpoints() async def load_network_info(self, *, load_devices=False): + """Load supported parameters of network_info and node_info from the device.""" # Load node info node_info = self.state.node_info node_info.nwk = zigpy.types.NWK(await self._api._at_command("MY")) @@ -139,9 +147,11 @@ async def load_network_info(self, *, load_devices=False): network_info.channel = await self._api._at_command("CH") async def reset_network_info(self) -> None: + """Reset Zigbee network.""" await self._api._at_command("NR", 0) async def write_network_info(self, *, network_info, node_info): + """Write supported network_info and node_info parameters to the device.""" epid, _ = zigpy.types.uint64_t.deserialize( network_info.extended_pan_id.serialize() ) @@ -174,14 +184,14 @@ async def write_network_info(self, *, network_info, node_info): async def _move_network_to_channel( self, new_channel: int, new_nwk_update_id: int ) -> None: - """Moves the coordinator to a new channel.""" + """Move the coordinator to a new channel.""" scan_bitmask = 1 << (new_channel - 11) await self._api._queued_at("SC", scan_bitmask) async def energy_scan( self, channels: zigpy.types.Channels, duration_exp: int, count: int ) -> dict[int, float]: - """Runs an energy detection scan and returns the per-channel scan results.""" + """Run an energy detection scan and returns the per-channel scan results.""" all_results = {} for _ in range(count): @@ -238,6 +248,7 @@ async def _get_association_state(self): return state async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: + """Send ZigbeePacket via the device.""" LOGGER.debug("Sending packet %r", packet) try: @@ -304,6 +315,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: def remote_at_command( self, nwk, cmd_name, *args, apply_changes=True, encryption=True ): + """Execute AT command on another XBee module in the network.""" LOGGER.debug("Remote AT%s command: %s", cmd_name, args) options = zigpy.types.uint8_t(0) if apply_changes: @@ -314,6 +326,7 @@ def remote_at_command( return self._api._remote_at_command(dev.ieee, nwk, options, cmd_name, *args) async def permit_ncp(self, time_s=60): + """Permit join.""" assert 0 <= time_s <= 254 await self._api._at_command("NJ", time_s) await self._api._at_command("AC") @@ -335,11 +348,13 @@ async def permit_with_key(self, node: EUI64, code: bytes, time_s=500): await self.permit_with_link_key(node, code, time_s, key_type=1) def handle_modem_status(self, status): + """Handle changed Modem Status of the device.""" LOGGER.info("Modem status update: %s (%s)", status.name, status.value) def handle_rx( self, src_ieee, src_nwk, src_ep, dst_ep, cluster_id, profile_id, rxopts, data ): + """Handle receipt of Zigbee data from the device.""" src = zigpy.types.AddrModeAddress( addr_mode=zigpy.types.AddrMode.NWK, address=src_nwk ) @@ -371,10 +386,16 @@ def handle_rx( class XBeeCoordinator(zigpy.quirks.CustomDevice): + """Zigpy Device representing Coordinator.""" + class XBeeGroup(zigpy.quirks.CustomCluster, Groups): + """XBeeGroup custom cluster.""" + cluster_id = 0x0006 class XBeeGroupResponse(zigpy.quirks.CustomCluster, Groups): + """XBeeGroupResponse custom cluster.""" + cluster_id = 0x8006 ep_attribute = "xbee_groups_response" @@ -388,6 +409,8 @@ class XBeeGroupResponse(zigpy.quirks.CustomCluster, Groups): } def __init__(self, *args, **kwargs): + """Initialize instance.""" + super().__init__(*args, **kwargs) self.node_desc = zdo_t.NodeDescriptor( logical_type=zdo_t.LogicalType.Coordinator,