-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7916d87
commit 1afb591
Showing
2 changed files
with
194 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
"""Module for running com0com commands directly using the com0com CLI.""" | ||
|
||
|
||
from pathlib import Path | ||
import re | ||
import subprocess | ||
from typing import Dict, List | ||
import winreg | ||
|
||
from com0com import Com0comBase, Com0comException, PortPair | ||
|
||
|
||
# https://microsoft.github.io/windows-docs-rs/doc/windows/Win32/Devices/SerialCommunication/constant.COMDB_MAX_PORTS_ARBITRATED.html | ||
MAX_PORTS = 4092 | ||
|
||
|
||
def get_com0com_install_directory(): | ||
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, | ||
R'SOFTWARE\WOW6432Node\com0com') as key: | ||
return Path(winreg.QueryValueEx(key, 'Install_Dir')[0]) | ||
|
||
|
||
class Com0comRunner(Com0comBase): | ||
"""Class for running Com0com commands.""" | ||
|
||
def __init__(self): | ||
self._com0com_dir = get_com0com_install_directory() | ||
|
||
def run(self, args: List[str]): | ||
"""Run com0com with given `args` and return stdout. | ||
:param args: list of arguments to pass to com0com | ||
""" | ||
|
||
args = [str(self._com0com_dir / 'setupc.exe'), '--silent'] + args | ||
try: | ||
# com0com generally prints errors to stdout, so we redirect stderr | ||
# there so we don't have to handle both streams. | ||
res = subprocess.run(args, cwd=self._com0com_dir, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.STDOUT, | ||
text=True, check=True) | ||
except subprocess.CalledProcessError as e: | ||
raise Com0comException(f'com0com command failed: {e.stdout}') | ||
return res.stdout | ||
|
||
def install_pair(self, a_params: Dict[str, str], | ||
b_params: Dict[str, str]) -> PortPair: | ||
|
||
def _make_param_str(params: Dict[str, str]): | ||
"""Create string of paramaters for com0com from dict.""" | ||
if not params: | ||
return '-' | ||
else: | ||
return ','.join(f'{k}={v}' for k, v in params.items()) | ||
|
||
res = self.run(['install', _make_param_str(a_params), | ||
_make_param_str(b_params)]) | ||
|
||
# Loop over each output line to find the port ID for A and B ports. | ||
lines = [line.strip() for line in res.splitlines()] | ||
a = b = None | ||
for line in lines: | ||
# Group 1 (outer group) is the port name. | ||
# Group 2 (inner group) is 'A' or 'B'. | ||
match = re.match(r'^(CNC([AB])[0-9]+) ', line) | ||
if match: | ||
if match.group(2) == 'A': | ||
a = match.group(1) | ||
else: | ||
b = match.group(1) | ||
if a is not None and b is not None: | ||
break | ||
else: | ||
raise Com0comException('did not get port name for each port') | ||
return PortPair(a, b) | ||
|
||
def remove_pair(self, port_pair: PortPair) -> None: | ||
pass | ||
|
||
def disable_all(self) -> None: | ||
pass | ||
|
||
def enable_all(self) -> None: | ||
pass | ||
|
||
def change_params(self, port: str, params: Dict[str, str]) -> None: | ||
pass | ||
|
||
def list_ports(self) -> Dict[str, Dict[str, str]]: | ||
pass | ||
|
||
def busynames(self, pattern: str) -> List[str]: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from pathlib import Path | ||
from subprocess import CalledProcessError, CompletedProcess | ||
import unittest | ||
from unittest.mock import patch, MagicMock, Mock | ||
import winreg | ||
|
||
from com0com import Com0comException | ||
from com0com.runner import ( | ||
Com0comRunner, get_com0com_install_directory, MAX_PORTS | ||
) | ||
|
||
|
||
# Re-usable patch for get_com0com_install_directory. `new` is used so it | ||
# doesn't create a parameter when used as a decorator. | ||
get_com0com_install_directory_patch = patch( | ||
'com0com.runner.get_com0com_install_directory', | ||
new=Mock(return_value=Path(R'C:\com0com')) | ||
) | ||
|
||
|
||
def patch_run(stdout: str): | ||
"""Patch `run()`, returning the specified stdout.""" | ||
|
||
return patch.object(Com0comRunner, 'run', return_value=stdout) | ||
|
||
|
||
class GetCom0comInstallDirectoryTestCase(unittest.TestCase): | ||
@patch('winreg.QueryValueEx', return_value=[R'C:\com0com']) | ||
@patch('winreg.OpenKey') | ||
def test_ok(self, openkey_mock: Mock, queryvalueex_mock: Mock): | ||
# Dummy return value/context manager for registry handle object. | ||
key = MagicMock() | ||
key.__enter__.return_value = key | ||
openkey_mock.return_value = key | ||
|
||
res = get_com0com_install_directory() | ||
openkey_mock.assert_called_once_with( | ||
winreg.HKEY_LOCAL_MACHINE, | ||
R'SOFTWARE\WOW6432Node\com0com' | ||
) | ||
queryvalueex_mock.assert_called_once_with(key, 'Install_Dir') | ||
self.assertEqual(res, Path(R'C:\com0com')) | ||
|
||
|
||
@get_com0com_install_directory_patch | ||
@patch('subprocess.run') | ||
class RunTestCase(unittest.TestCase): | ||
def test_ok(self, run_mock: Mock): | ||
args = ['foo', 'bar'] | ||
run_mock.return_value = CompletedProcess(args, 0, 'hello') | ||
|
||
runner = Com0comRunner() | ||
res = runner.run(args) | ||
|
||
run_mock.assert_called_once() | ||
self.assertEqual(run_mock.call_args[0][0][-2:], args) | ||
self.assertEqual(res, 'hello') | ||
|
||
def test_error(self, run_mock: Mock): | ||
args = [] | ||
run_mock.side_effect = CalledProcessError(1, args, 'error') | ||
|
||
runner = Com0comRunner() | ||
# Check that the right exeption is called, and contains the stdout. | ||
with self.assertRaisesRegex(Com0comException, 'error'): | ||
runner.run(args) | ||
|
||
|
||
@get_com0com_install_directory_patch | ||
@patch_run(' CNCA1 PortName=-\n CNCB1 PortName=-\nrubbish\n\n') | ||
class InstallPairTestCase(unittest.TestCase): | ||
def test_no_params(self, run_mock: Mock): | ||
pair = Com0comRunner().install_pair({}, {}) | ||
run_mock.assert_called_once_with(['install', '-', '-']) | ||
self.assertEqual(pair.a, 'CNCA1') | ||
self.assertEqual(pair.b, 'CNCB1') | ||
|
||
def test_port_a_1_param(self, run_mock: Mock): | ||
pair = Com0comRunner().install_pair({'foo': 'bar'}, {}) | ||
run_mock.assert_called_once_with(['install', 'foo=bar', '-']) | ||
self.assertEqual(pair.a, 'CNCA1') | ||
self.assertEqual(pair.b, 'CNCB1') | ||
|
||
def test_port_b_2_params(self, run_mock: Mock): | ||
pair = Com0comRunner().install_pair({}, {'foo': 'bar', 'faz': 'baz'}) | ||
run_mock.assert_called_once_with(['install', '-', 'foo=bar,faz=baz']) | ||
self.assertEqual(pair.a, 'CNCA1') | ||
self.assertEqual(pair.b, 'CNCB1') | ||
|
||
def test_both_port_params(self, run_mock: Mock): | ||
pair = Com0comRunner().install_pair({'aa': 'bb', 'cc': 'dd'}, | ||
{'ee': 'ff'}) | ||
run_mock.assert_called_once_with(['install', 'aa=bb,cc=dd', 'ee=ff']) | ||
self.assertEqual(pair.a, 'CNCA1') | ||
self.assertEqual(pair.b, 'CNCB1') | ||
|
||
def test_one_name_not_returned(self, run_mock: Mock): | ||
run_mock.return_value = ' CNCA1 PortName=-\nrubbish\n\n' | ||
with self.assertRaises(Com0comException): | ||
Com0comRunner().install_pair({}, {}) |