Skip to content

Commit

Permalink
Adding support for HL78xx custom commands
Browse files Browse the repository at this point in the history
- Adding a new set of commands to support HL78xx LPWA modules.
- Adding a new CLI option to handle the baud rate (default 115200) and update all relevant methods
-  refactor methods name from 'em' to 'module'
- adding and updating the test per new changes.
- update the version to 0.1.2
  • Loading branch information
elkanamol committed Aug 27, 2024
1 parent c02e078 commit b7a1843
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 38 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from setuptools import setup, find_packages
import os

version = "0.1.1"
version = "0.1.2"
# User-friendly description from README.md
current_directory = os.path.dirname(os.path.abspath(__file__))
try:
Expand Down
4 changes: 2 additions & 2 deletions sierra_status/src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ def main() -> None:
optional.add_argument("-m", "--model", help="Model of the device to add to filename (e.g., EM9191 or EM7455)", default="")
optional.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true")
optional.add_argument("-s", "--search", help="Search for network using AT!COPS=?", action="store_true")

optional.add_argument("-b", "--baudrate", help="Baudrate to use for serial communication (default: 115200)", default=115200, type=int)
args = parser.parse_args()

log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=log_level)
usb_handle.start_process(args.port, args.model.lower(), log_level, args.search)
usb_handle.start_process(args.port, args.model.lower(), log_level, args.search, args.baudrate)

if __name__ == "__main__":
main()
Expand Down
59 changes: 48 additions & 11 deletions sierra_status/src/usb_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,41 @@
"AT+COPS?"
]

AT_COMMANDS_HL78 =[
"ATI",
"AT+CMEE=1",
"AT+KSRAT?",
"AT+KBNDCFG?",
"AT+CIMI",
"AT+CPIN?",
"AT+CCID?",
"AT+CGSN",
"AT+HWREV",
"AT+CGDCONT?",
"AT+KCARRIERCFG?",
"AT+CEDRXS?",
"AT+CPSMS?",
"AT+KSIMDET?",
"AT+KSIMSEL?",
"AT+CREG?",
"AT+CEREG?",
"AT+KUSBCOMP?",
"AT&V",
"AT+IPR?",
"AT+CSQ",
"AT+KSLEEP?",
"AT+KNWSCANCFG?",
"AT+KTEMPMON?",
"AT+KCERTSTORE?",
"AT+KTCPCFG?",
"AT+KUDPCFG?",
"AT+KIPOPT?",
"AT+WDSC?",
"AT+WDSG",
"AT+NVBU=2",
"AT+COPS?"
]

AT_COMMAND_COPS = "AT+COPS=?"

def animate_spinner() -> None:
Expand All @@ -49,11 +84,11 @@ def animate_spinner() -> None:
sys.stdout.flush()
time.sleep(0.05)

def send_at_command(port: str, command: str, timeout: float = 60) -> str:
def send_at_command(port: str, command: str, timeout: float = 60, baudrate: int = 115200) -> str:
result = ""
start_time = time.time()
try:
with serial.Serial(port, 115200, timeout=0.5) as console:
with serial.Serial(port, baudrate, timeout=0.5) as console:
logging.debug(f"Sending command: {command}")
console.write(f"{command}\r\n".encode("utf-8"))
while time.time() - start_time < timeout:
Expand All @@ -69,27 +104,29 @@ def send_at_command(port: str, command: str, timeout: float = 60) -> str:
sys.stdout.flush()
return "\n".join(line.strip() for line in result.splitlines() if line.strip())

def get_em_status(port: str, search: int) -> str:
def get_module_status(port: str, search: int, model: str, baudrate: int = 115200) -> str:
"""
Retrieves the status of an EM9xxx module using AT commands.
Retrieves the status of an module using AT commands.
"""
result = ""
try:
result = "\n\n".join(send_at_command(port, command).strip() for command in AT_COMMANDS)
commands = AT_COMMANDS_HL78 if model.lower() == "hl78xx" else AT_COMMANDS
result = "\n\n".join(send_at_command(port, command, baudrate=baudrate).strip() for command in commands)
if search:
result += "\n\n" + get_em_cops(port)
except Exception as e:
logging.error(f"Error getting EM9 status: {e}")
logging.error(f"Error getting module status: {e}")
return result

def get_em_cops(port: str) -> str:
def get_em_cops(port: str, baudrate: int = 115200) -> str:
"""
Retrieves the status of an EM9xxx module using AT commands.
"""
result = ""
try:

logging.info(f"Sending command: {AT_COMMAND_COPS},wait for finishing")
result = "".join(send_at_command(port, AT_COMMAND_COPS, 120).strip())
result = "".join(send_at_command(port, AT_COMMAND_COPS, 120, baudrate).strip())
except Exception as e:
logging.error(f"Error getting EM9 status: {e}")
return result
Expand All @@ -106,13 +143,13 @@ def creat_status_file(result: str, model: str) -> None:
except Exception as e:
logging.error(f"Error creating status file: {e}")

def start_process(port: str, model: str, log_level: int, search: int) -> None:
def start_process(port: str, model: str, log_level: int, search: int, baudrate: int = 115200) -> None:
"""
Main function to retrieve the status of an EM9xxx module using AT commands.
"""
logging.basicConfig(level=log_level)
logging.info(f"Starting process for port {port}")
result = get_em_status(port, search)
logging.info(f"Starting process for port {port} with model {model} and baudrate {baudrate}")
result = get_module_status(port, search, model, baudrate)
if result:
creat_status_file(result, model)
else:
Expand Down
66 changes: 42 additions & 24 deletions tests/test_usb_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest
from unittest.mock import mock_open, patch, MagicMock

from sierra_status.src.usb_handle import AT_COMMAND_COPS, AT_COMMANDS, animate_spinner, creat_status_file, get_em_cops, get_em_status, send_at_command, start_process
from sierra_status.src.usb_handle import AT_COMMAND_COPS, AT_COMMANDS, AT_COMMANDS_HL78, animate_spinner, creat_status_file, get_em_cops, get_module_status, send_at_command, start_process

class TestATCommands(unittest.TestCase):

Expand Down Expand Up @@ -30,6 +30,17 @@ def test_at_commands_uppercase(self):
for command in AT_COMMANDS:
self.assertEqual(command, command.upper())

def test_at_commands_hl78_list_not_empty(self):
self.assertTrue(len(AT_COMMANDS_HL78) > 0)

def test_at_commands_hl78_are_strings(self):
for command in AT_COMMANDS_HL78:
self.assertIsInstance(command, str)

def test_at_commands_hl78_start_with_at(self):
for command in AT_COMMANDS_HL78:
self.assertTrue(command.startswith("AT"))

class TestUSBHandle(unittest.TestCase):

def setUp(self):
Expand All @@ -39,7 +50,6 @@ def setUp(self):

@patch('sierra_status.src.usb_handle.serial.Serial')
def test_send_at_command_success(self, mock_serial):
# mock_serial.return_value.read_until.return_value = self.mock_result.encode('utf-8')
mock_serial.return_value.read.return_value = b'OK\r\n'
result = send_at_command(self.mock_port, self.mock_command)
self.assertEqual(result, "")
Expand All @@ -51,18 +61,18 @@ def test_send_at_command_exception(self, mock_serial):
self.assertEqual(result, "")

@patch('sierra_status.src.usb_handle.send_at_command')
def test_get_em_status_without_search(self, mock_send_at_command):
def test_get_module_status_without_search(self, mock_send_at_command):
mock_send_at_command.return_value = "Test Result"
result = get_em_status(self.mock_port, 0)
result = get_module_status(self.mock_port, 0, "EM9xxx")
self.assertIn("Test Result", result)
self.assertNotIn(AT_COMMAND_COPS, result)

@patch('sierra_status.src.usb_handle.send_at_command')
@patch('sierra_status.src.usb_handle.get_em_cops')
def test_get_em_status_with_search(self, mock_get_em_cops, mock_send_at_command):
def test_get_module_status_with_search(self, mock_get_em_cops, mock_send_at_command):
mock_send_at_command.return_value = "Test Result"
mock_get_em_cops.return_value = "COPS Result"
result = get_em_status(self.mock_port, 1)
result = get_module_status(self.mock_port, 1, "EM9xxx")
self.assertIn("Test Result", result)
self.assertIn("COPS Result", result)

Expand All @@ -71,7 +81,7 @@ def test_get_em_cops(self, mock_send_at_command):
mock_send_at_command.return_value = "COPS Test Result"
result = get_em_cops(self.mock_port)
self.assertEqual(result, "COPS Test Result")
mock_send_at_command.assert_called_with(self.mock_port, AT_COMMAND_COPS, 120)
mock_send_at_command.assert_called_with(self.mock_port, AT_COMMAND_COPS, 120, 115200)

@patch('builtins.open', new_callable=mock_open)
@patch('sierra_status.src.usb_handle.time.strftime')
Expand All @@ -81,20 +91,21 @@ def test_creat_status_file(self, mock_strftime, mock_file):
mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w")
mock_file().write.assert_called_with("Test Status")

@patch('sierra_status.src.usb_handle.get_em_status')
@patch('sierra_status.src.usb_handle.get_module_status')
@patch('sierra_status.src.usb_handle.creat_status_file')
def test_start_process_with_result(self, mock_creat_status_file, mock_get_em_status):
mock_get_em_status.return_value = "Test Status"
def test_start_process_with_result(self, mock_creat_status_file, mock_get_module_status):
mock_get_module_status.return_value = "Test Status"
start_process(self.mock_port, "TestModel", logging.INFO, 0)
mock_creat_status_file.assert_called_with("Test Status", "TestModel")

@patch('sierra_status.src.usb_handle.get_em_status')
@patch('sierra_status.src.usb_handle.get_module_status')
@patch('sierra_status.src.usb_handle.creat_status_file')
def test_start_process_without_result(self, mock_creat_status_file, mock_get_em_status):
mock_get_em_status.return_value = ""
def test_start_process_without_result(self, mock_creat_status_file, mock_get_module_status):
mock_get_module_status.return_value = ""
start_process(self.mock_port, "TestModel", logging.INFO, 0)
mock_creat_status_file.assert_not_called()


class TestAnimateSpinner(unittest.TestCase):

@patch('sys.stdout')
Expand All @@ -121,20 +132,27 @@ def test_send_at_command_error_response(self, mock_serial):
result = send_at_command("COM1", "AT+TEST")
self.assertEqual(result, "")

class TestGetEMStatus(unittest.TestCase):
class TestGetModuleStatus(unittest.TestCase):

@patch('sierra_status.src.usb_handle.send_at_command')
def test_get_em_status_exception(self, mock_send_at_command):
def test_get_module_status_exception(self, mock_send_at_command):
mock_send_at_command.side_effect = Exception("Test exception")
result = get_em_status("COM1", 0)
result = get_module_status("COM1", 0, "EM9xxx")
self.assertEqual(result, "")

@patch('sierra_status.src.usb_handle.send_at_command')
def test_get_em_status_all_commands(self, mock_send_at_command):
def test_get_module_status_all_commands(self, mock_send_at_command):
mock_send_at_command.return_value = "OK"
result = get_em_status("COM1", 0)
result = get_module_status("COM1", 0, "EM9xxx")
self.assertEqual(result.count("OK"), len(AT_COMMANDS))

@patch('sierra_status.src.usb_handle.send_at_command')
def test_get_module_status_hl78xx(self, mock_send_at_command):
mock_send_at_command.return_value = "OK"
result = get_module_status("COM1", 0, "HL78xx")
self.assertEqual(result.count("OK"), len(AT_COMMANDS_HL78))


class TestCreatStatusFile(unittest.TestCase):

@patch('builtins.open', new_callable=mock_open)
Expand All @@ -148,19 +166,19 @@ def test_creat_status_file_exception(self, mock_strftime, mock_file):

class TestStartProcess(unittest.TestCase):

@patch('sierra_status.src.usb_handle.get_em_status')
@patch('sierra_status.src.usb_handle.get_module_status')
@patch('sierra_status.src.usb_handle.creat_status_file')
@patch('sierra_status.src.usb_handle.logging.basicConfig')
def test_start_process_log_level(self, mock_basicConfig, mock_creat_status_file, mock_get_em_status):
mock_get_em_status.return_value = "Test Status"
def test_start_process_log_level(self, mock_basicConfig, mock_creat_status_file, mock_get_module_status):
mock_get_module_status.return_value = "Test Status"
start_process("COM1", "TestModel", logging.DEBUG, 0)
mock_basicConfig.assert_called_with(level=logging.DEBUG)

@patch('sierra_status.src.usb_handle.get_em_status')
@patch('sierra_status.src.usb_handle.get_module_status')
@patch('sierra_status.src.usb_handle.creat_status_file')
@patch('sierra_status.src.usb_handle.logging.error')
def test_start_process_no_result(self, mock_logging_error, mock_creat_status_file, mock_get_em_status):
mock_get_em_status.return_value = ""
def test_start_process_no_result(self, mock_logging_error, mock_creat_status_file, mock_get_module_status):
mock_get_module_status.return_value = ""
start_process("COM1", "TestModel", logging.INFO, 0)
mock_logging_error.assert_called_with("No result received from the module.")
mock_creat_status_file.assert_not_called()
Expand Down

0 comments on commit b7a1843

Please sign in to comment.