From b7a18433818b83dbb5404ca639f964e1e1749b07 Mon Sep 17 00:00:00 2001 From: Elkana Molson Date: Tue, 27 Aug 2024 17:46:51 +0300 Subject: [PATCH] Adding support for HL78xx custom commands - Adding a new set of commands to support HL78xx LPWA modules. - Adding a new CLI option to handle the baud rate (default 115200) and update all relevant methods - refactor methods name from 'em' to 'module' - adding and updating the test per new changes. - update the version to 0.1.2 --- setup.py | 2 +- sierra_status/src/cli.py | 4 +- sierra_status/src/usb_handle.py | 59 +++++++++++++++++++++++------ tests/test_usb_handle.py | 66 +++++++++++++++++++++------------ 4 files changed, 93 insertions(+), 38 deletions(-) diff --git a/setup.py b/setup.py index 497c383..b5196fc 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ from setuptools import setup, find_packages import os -version = "0.1.1" +version = "0.1.2" # User-friendly description from README.md current_directory = os.path.dirname(os.path.abspath(__file__)) try: diff --git a/sierra_status/src/cli.py b/sierra_status/src/cli.py index c4d5962..66fe4fe 100644 --- a/sierra_status/src/cli.py +++ b/sierra_status/src/cli.py @@ -23,12 +23,12 @@ def main() -> None: optional.add_argument("-m", "--model", help="Model of the device to add to filename (e.g., EM9191 or EM7455)", default="") optional.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true") optional.add_argument("-s", "--search", help="Search for network using AT!COPS=?", action="store_true") - + optional.add_argument("-b", "--baudrate", help="Baudrate to use for serial communication (default: 115200)", default=115200, type=int) args = parser.parse_args() log_level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig(level=log_level) - usb_handle.start_process(args.port, args.model.lower(), log_level, args.search) + usb_handle.start_process(args.port, args.model.lower(), log_level, args.search, args.baudrate) if __name__ == "__main__": main() diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index 827ec27..b2e1243 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -37,6 +37,41 @@ "AT+COPS?" ] +AT_COMMANDS_HL78 =[ + "ATI", + "AT+CMEE=1", + "AT+KSRAT?", + "AT+KBNDCFG?", + "AT+CIMI", + "AT+CPIN?", + "AT+CCID?", + "AT+CGSN", + "AT+HWREV", + "AT+CGDCONT?", + "AT+KCARRIERCFG?", + "AT+CEDRXS?", + "AT+CPSMS?", + "AT+KSIMDET?", + "AT+KSIMSEL?", + "AT+CREG?", + "AT+CEREG?", + "AT+KUSBCOMP?", + "AT&V", + "AT+IPR?", + "AT+CSQ", + "AT+KSLEEP?", + "AT+KNWSCANCFG?", + "AT+KTEMPMON?", + "AT+KCERTSTORE?", + "AT+KTCPCFG?", + "AT+KUDPCFG?", + "AT+KIPOPT?", + "AT+WDSC?", + "AT+WDSG", + "AT+NVBU=2", + "AT+COPS?" +] + AT_COMMAND_COPS = "AT+COPS=?" def animate_spinner() -> None: @@ -49,11 +84,11 @@ def animate_spinner() -> None: sys.stdout.flush() time.sleep(0.05) -def send_at_command(port: str, command: str, timeout: float = 60) -> str: +def send_at_command(port: str, command: str, timeout: float = 60, baudrate: int = 115200) -> str: result = "" start_time = time.time() try: - with serial.Serial(port, 115200, timeout=0.5) as console: + with serial.Serial(port, baudrate, timeout=0.5) as console: logging.debug(f"Sending command: {command}") console.write(f"{command}\r\n".encode("utf-8")) while time.time() - start_time < timeout: @@ -69,27 +104,29 @@ def send_at_command(port: str, command: str, timeout: float = 60) -> str: sys.stdout.flush() return "\n".join(line.strip() for line in result.splitlines() if line.strip()) -def get_em_status(port: str, search: int) -> str: +def get_module_status(port: str, search: int, model: str, baudrate: int = 115200) -> str: """ - Retrieves the status of an EM9xxx module using AT commands. + Retrieves the status of an module using AT commands. """ result = "" try: - result = "\n\n".join(send_at_command(port, command).strip() for command in AT_COMMANDS) + commands = AT_COMMANDS_HL78 if model.lower() == "hl78xx" else AT_COMMANDS + result = "\n\n".join(send_at_command(port, command, baudrate=baudrate).strip() for command in commands) if search: result += "\n\n" + get_em_cops(port) except Exception as e: - logging.error(f"Error getting EM9 status: {e}") + logging.error(f"Error getting module status: {e}") return result -def get_em_cops(port: str) -> str: +def get_em_cops(port: str, baudrate: int = 115200) -> str: """ Retrieves the status of an EM9xxx module using AT commands. """ result = "" try: + logging.info(f"Sending command: {AT_COMMAND_COPS},wait for finishing") - result = "".join(send_at_command(port, AT_COMMAND_COPS, 120).strip()) + result = "".join(send_at_command(port, AT_COMMAND_COPS, 120, baudrate).strip()) except Exception as e: logging.error(f"Error getting EM9 status: {e}") return result @@ -106,13 +143,13 @@ def creat_status_file(result: str, model: str) -> None: except Exception as e: logging.error(f"Error creating status file: {e}") -def start_process(port: str, model: str, log_level: int, search: int) -> None: +def start_process(port: str, model: str, log_level: int, search: int, baudrate: int = 115200) -> None: """ Main function to retrieve the status of an EM9xxx module using AT commands. """ logging.basicConfig(level=log_level) - logging.info(f"Starting process for port {port}") - result = get_em_status(port, search) + logging.info(f"Starting process for port {port} with model {model} and baudrate {baudrate}") + result = get_module_status(port, search, model, baudrate) if result: creat_status_file(result, model) else: diff --git a/tests/test_usb_handle.py b/tests/test_usb_handle.py index c521f03..0dc3095 100644 --- a/tests/test_usb_handle.py +++ b/tests/test_usb_handle.py @@ -2,7 +2,7 @@ import unittest from unittest.mock import mock_open, patch, MagicMock -from sierra_status.src.usb_handle import AT_COMMAND_COPS, AT_COMMANDS, animate_spinner, creat_status_file, get_em_cops, get_em_status, send_at_command, start_process +from sierra_status.src.usb_handle import AT_COMMAND_COPS, AT_COMMANDS, AT_COMMANDS_HL78, animate_spinner, creat_status_file, get_em_cops, get_module_status, send_at_command, start_process class TestATCommands(unittest.TestCase): @@ -30,6 +30,17 @@ def test_at_commands_uppercase(self): for command in AT_COMMANDS: self.assertEqual(command, command.upper()) + def test_at_commands_hl78_list_not_empty(self): + self.assertTrue(len(AT_COMMANDS_HL78) > 0) + + def test_at_commands_hl78_are_strings(self): + for command in AT_COMMANDS_HL78: + self.assertIsInstance(command, str) + + def test_at_commands_hl78_start_with_at(self): + for command in AT_COMMANDS_HL78: + self.assertTrue(command.startswith("AT")) + class TestUSBHandle(unittest.TestCase): def setUp(self): @@ -39,7 +50,6 @@ def setUp(self): @patch('sierra_status.src.usb_handle.serial.Serial') def test_send_at_command_success(self, mock_serial): - # mock_serial.return_value.read_until.return_value = self.mock_result.encode('utf-8') mock_serial.return_value.read.return_value = b'OK\r\n' result = send_at_command(self.mock_port, self.mock_command) self.assertEqual(result, "") @@ -51,18 +61,18 @@ def test_send_at_command_exception(self, mock_serial): self.assertEqual(result, "") @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_em_status_without_search(self, mock_send_at_command): + def test_get_module_status_without_search(self, mock_send_at_command): mock_send_at_command.return_value = "Test Result" - result = get_em_status(self.mock_port, 0) + result = get_module_status(self.mock_port, 0, "EM9xxx") self.assertIn("Test Result", result) self.assertNotIn(AT_COMMAND_COPS, result) @patch('sierra_status.src.usb_handle.send_at_command') @patch('sierra_status.src.usb_handle.get_em_cops') - def test_get_em_status_with_search(self, mock_get_em_cops, mock_send_at_command): + def test_get_module_status_with_search(self, mock_get_em_cops, mock_send_at_command): mock_send_at_command.return_value = "Test Result" mock_get_em_cops.return_value = "COPS Result" - result = get_em_status(self.mock_port, 1) + result = get_module_status(self.mock_port, 1, "EM9xxx") self.assertIn("Test Result", result) self.assertIn("COPS Result", result) @@ -71,7 +81,7 @@ def test_get_em_cops(self, mock_send_at_command): mock_send_at_command.return_value = "COPS Test Result" result = get_em_cops(self.mock_port) self.assertEqual(result, "COPS Test Result") - mock_send_at_command.assert_called_with(self.mock_port, AT_COMMAND_COPS, 120) + mock_send_at_command.assert_called_with(self.mock_port, AT_COMMAND_COPS, 120, 115200) @patch('builtins.open', new_callable=mock_open) @patch('sierra_status.src.usb_handle.time.strftime') @@ -81,20 +91,21 @@ def test_creat_status_file(self, mock_strftime, mock_file): mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") mock_file().write.assert_called_with("Test Status") - @patch('sierra_status.src.usb_handle.get_em_status') + @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') - def test_start_process_with_result(self, mock_creat_status_file, mock_get_em_status): - mock_get_em_status.return_value = "Test Status" + def test_start_process_with_result(self, mock_creat_status_file, mock_get_module_status): + mock_get_module_status.return_value = "Test Status" start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_called_with("Test Status", "TestModel") - @patch('sierra_status.src.usb_handle.get_em_status') + @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') - def test_start_process_without_result(self, mock_creat_status_file, mock_get_em_status): - mock_get_em_status.return_value = "" + def test_start_process_without_result(self, mock_creat_status_file, mock_get_module_status): + mock_get_module_status.return_value = "" start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_not_called() + class TestAnimateSpinner(unittest.TestCase): @patch('sys.stdout') @@ -121,20 +132,27 @@ def test_send_at_command_error_response(self, mock_serial): result = send_at_command("COM1", "AT+TEST") self.assertEqual(result, "") -class TestGetEMStatus(unittest.TestCase): +class TestGetModuleStatus(unittest.TestCase): @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_em_status_exception(self, mock_send_at_command): + def test_get_module_status_exception(self, mock_send_at_command): mock_send_at_command.side_effect = Exception("Test exception") - result = get_em_status("COM1", 0) + result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result, "") @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_em_status_all_commands(self, mock_send_at_command): + def test_get_module_status_all_commands(self, mock_send_at_command): mock_send_at_command.return_value = "OK" - result = get_em_status("COM1", 0) + result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result.count("OK"), len(AT_COMMANDS)) + @patch('sierra_status.src.usb_handle.send_at_command') + def test_get_module_status_hl78xx(self, mock_send_at_command): + mock_send_at_command.return_value = "OK" + result = get_module_status("COM1", 0, "HL78xx") + self.assertEqual(result.count("OK"), len(AT_COMMANDS_HL78)) + + class TestCreatStatusFile(unittest.TestCase): @patch('builtins.open', new_callable=mock_open) @@ -148,19 +166,19 @@ def test_creat_status_file_exception(self, mock_strftime, mock_file): class TestStartProcess(unittest.TestCase): - @patch('sierra_status.src.usb_handle.get_em_status') + @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') @patch('sierra_status.src.usb_handle.logging.basicConfig') - def test_start_process_log_level(self, mock_basicConfig, mock_creat_status_file, mock_get_em_status): - mock_get_em_status.return_value = "Test Status" + def test_start_process_log_level(self, mock_basicConfig, mock_creat_status_file, mock_get_module_status): + mock_get_module_status.return_value = "Test Status" start_process("COM1", "TestModel", logging.DEBUG, 0) mock_basicConfig.assert_called_with(level=logging.DEBUG) - @patch('sierra_status.src.usb_handle.get_em_status') + @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') @patch('sierra_status.src.usb_handle.logging.error') - def test_start_process_no_result(self, mock_logging_error, mock_creat_status_file, mock_get_em_status): - mock_get_em_status.return_value = "" + def test_start_process_no_result(self, mock_logging_error, mock_creat_status_file, mock_get_module_status): + mock_get_module_status.return_value = "" start_process("COM1", "TestModel", logging.INFO, 0) mock_logging_error.assert_called_with("No result received from the module.") mock_creat_status_file.assert_not_called()