Skip to content

Commit

Permalink
Merge pull request #1343 from bdring/dymk/refactor-run-fixtures
Browse files Browse the repository at this point in the history
Refactor run_fixture tool, clean up ops
  • Loading branch information
dymk authored Oct 1, 2024
2 parents c04d38b + e2bd934 commit de03f8d
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 244 deletions.
3 changes: 1 addition & 2 deletions fixture_tests/fixtures/alarms.nc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
-> $Alarm/Send=10
<- ok
<- [MSG:INFO: ALARM: Spindle Control]
# end in an unlocked state so other fixtures can run
-> $X
<- ALARM:10
-> $X
<- [MSG:INFO: Caution: Unlocked]
<- ok
2 changes: 2 additions & 0 deletions fixture_tests/fixtures/flow_control_alarm.nc
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
<- ok
<- ok
<- [MSG:INFO: PRINT, success]
<- ok
<- ok
3 changes: 2 additions & 1 deletion fixture_tests/fixtures/idle_status.nc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<~ [MSG:INFO: Caution: Unlocked]
<- ok
-> ??
<- <Idle|MPos:0.000,0.000,0.000|FS:0,0|WCO:0.000,0.000,0.000>
-> ??
<| <Idle|MPos:0.000,0.000,0.000|FS:0,0>
<| <Idle|MPos:0.000,0.000,0.000|FS:0,0|Ov:100,100,100>
<| <Idle|MPos:0.000,0.000,0.000|FS:0,0|WCO:0.000,0.000,0.000>
265 changes: 24 additions & 241 deletions fixture_tests/run_fixture
Original file line number Diff line number Diff line change
Expand Up @@ -5,274 +5,57 @@ from termcolor import colored
import argparse
import os
import serial
from xmodem import XMODEM
import re
import fnmatch
from tool import op_entries
from tool.controller import Controller

parser = argparse.ArgumentParser()
parser.add_argument("device")
parser.add_argument("fixture_file")
parser.add_argument("-b", "--baudrate", type=int, default=115200)
args = parser.parse_args()

OPS = [
# send command to controller
"->",
# send file to controller
"=>",
# expect from controller
"<-",
# expect from controller, but optional
"<~",
# consume lines until line is found
"<...",
# expect one of
"<|",
]

fixture_files = []
fixture_paths = []

# check if fixture_file is a directory
if os.path.isdir(args.fixture_file):
for file in os.listdir(args.fixture_file):
if file.endswith(".nc"):
fixture_files.append(os.path.join(args.fixture_file, file))
fixture_paths.append(os.path.join(args.fixture_file, file))
else:
fixture_files.append(args.fixture_file)


class OpEntry:
def __init__(self, op, data, lineno):
self.op = op
self.data = data
self.lineno = lineno
self.glob_match = False

def __str__(self):
return f"OpEntry({self.op}, {str(self.data)}, {self.lineno})"

def __repr__(self):
return str(self)

fixture_paths.append(args.fixture_file)

def parse_fixture_lines(fixture_file):
# op_entries is a list of tuples:
# (op, match, lineno)

# Read the fixture file
with open(fixture_file, "r") as f:
op_entries = []
for lineno, line in enumerate(f.read().splitlines()):
if line.startswith("#"):
# skip comment lines
continue

for op in OPS:
if line.startswith(op + " "):
line = line[len(op) + 1 :]
if line.startswith("* "):
line = line[2:]
glob_match = True
else:
glob_match = False

if op == "<|":
if len(op_entries) > 0 and op_entries[-1].op == "<|":
# append to previous group of matches
op_entries[-1].data.append(line)
else:
# new group of matches
op_entry = OpEntry(op, [line], lineno + 1)
op_entries.append(op_entry)
elif op == "=>":
# make the local path relative to the fixture file
line = line.split(" ")
local_file = line[0]
remote_file = line[1]
local_file = os.path.join(
os.path.dirname(fixture_file), local_file
)
if not os.path.exists(local_file):
raise ValueError(
f"Fixture {fixture_file} references file that does not exist: {local_file}"
)
if not remote_file.startswith("/"):
raise ValueError(
f"Remote file path must be absolute: {remote_file}"
)
op_entries.append(
OpEntry(op, (local_file, remote_file), lineno + 1)
)

# expect a message that the file was received
op_entries.append(
OpEntry("<-", "[MSG:Files changed]", lineno + 1)
)

else:
op_entry = OpEntry(op, line, lineno + 1)
op_entry.glob_match = glob_match
op_entries.append(op_entry)
break
else:
raise ValueError(
f"Invalid line {lineno} in fixture file {fixture_file}: {line}"
)

return op_entries


def run_fixture(fixture_file):
fixture_lines = parse_fixture_lines(fixture_file)
controller = serial.Serial(args.device, args.baudrate, timeout=1)

# last line read from the controller
line = None

def ensure_line():
nonlocal line
if line is None:
line = controller.readline().decode("utf-8").strip()
def run_fixture(fixture_path, controller):
op_entries_parsed = op_entries.parse_file(fixture_path)

try:
for op_entry in fixture_lines:
op = op_entry.op
op_data = op_entry.data
lineno = op_entry.lineno
if op == "->":
# send the fixture line to the controller
for op_entry in op_entries_parsed:
if not op_entry.execute(controller):
print(
colored(f"{op} ", "dark_grey")
+ colored(op_data, "green", attrs=["dark"])
colored(f"--- Fixture ", "red")
+ colored(fixture_path, "red", attrs=["bold"])
+ colored(" failed ---", "red")
)
controller.write(op_data.encode("utf-8") + b"\n")
elif op == "<-" or op == "<~" or op == "<|":
is_optional = op == "<~"
ensure_line()
if op == "<|": # match any one of
if line in op_data:
print(
colored(f"{op} ", "dark_grey")
+ colored(line, "green", attrs=["dark", "bold"])
)
line = None
else:
print(f"Test failed at line {colored(str(lineno), 'red')}")
print(f"Expected one of:")
for fline in op_data:
print(f" `{colored(fline, 'red')}'")
print(f"Actual: `{colored(line, 'red')}'")
exit(1)
elif line == op_data: # exact match
print(
colored(f"{op} ", "dark_grey")
+ colored(line, "green", attrs=["dark", "bold"])
)
line = None
else: # match failed
if is_optional: # but that's okay if it's an optional line
print(
colored(f"{op} Did not get optional line ", "dark_grey")
+ colored(op_data, "dark_grey", attrs=["bold"])
)
# do not clear line, so we can try to match it again on
# the next op
else:
print(f"Test failed at line {colored(str(lineno), 'red')}")
print(f"Expected: `{colored(op_data, 'red')}'")
print(f"Actual: `{colored(line, 'red')}'")
exit(1)
elif op == "=>":
local_file, remote_file = op_data
with open(local_file, "rb") as file_stream:

def getc(size, timeout=1):
return controller.read(size) or None

def putc(data, timeout=1):
return controller.write(data) or None

print(f"Sending {local_file} to {remote_file}")
controller.write(f"$XModem/Receive={remote_file}\n".encode("utf-8"))
while True:
# wait for the 'C' character to start the transfer
controller.timeout = 2
c = controller.read(1)
if c == b"C":
break
if c == b"":
raise TimeoutError(
f"XModem start timeout at line {lineno} in fixture file {fixture_file}"
)
controller.timeout = 1
xmodem = XMODEM(getc, putc)
xmodem.send(file_stream)
rx_ack_line = controller.readline().decode("utf-8").strip()
print(
colored(f"{op} ", "dark_grey")
+ colored(rx_ack_line, "green", attrs=["dark", "bold"])
)
matcher = re.match(
r"\[MSG:INFO: Received (\d+) bytes to file ([\w\/\.]+)\]",
rx_ack_line,
)
if matcher is None:
raise ValueError(
f"Transfer failed (ack line): {rx_ack_line} at line {lineno} in fixture file {fixture_file}"
)
num_tx_bytes = int(matcher.group(1))
name_tx_file = matcher.group(2)
if name_tx_file != remote_file:
print(f"Expected: {remote_file}")
print(f"Actual: {name_tx_file}")
raise ValueError(
f"Transfer failed (filename mismatch): {rx_ack_line} at line {lineno} in fixture file {fixture_file}"
)
print(
colored(f"{op} ", "dark_grey")
+ colored(local_file, "green", attrs=["bold"])
+ colored(" => ", "dark_grey")
+ colored(remote_file, "green", attrs=["bold"])
+ colored(f" ({num_tx_bytes} bytes)", "green")
)
elif op == "<...":
while True:
ensure_line()
print(
colored(
f"{op} " + ("(*) " if op_entry.glob_match else ""),
"dark_grey",
)
+ colored(line, "green", attrs=["dark", "bold"])
)

matched = False
if op_entry.glob_match:
matched = fnmatch.fnmatch(line, op_data)
else:
matched = line == op_data
line = None

if matched:
break

else:
raise ValueError(f"Invalid operation {op}")
exit(1)

except KeyboardInterrupt:
print("Interrupt")

except TimeoutError as e:
print("Timeout waiting for response, line: " + e.args[0])
finally:
controller.close()

print(
colored(f"--- Fixture ", "green")
+ colored(fixture_file, "green", attrs=["bold"])
+ colored(" passed ---", "green")
+ colored(fixture_path, "green", attrs=["bold"])
+ colored(" passed ---", "green"),
end="\n\n",
)
print()


for fixture_file in fixture_files:
run_fixture(fixture_file)
if __name__ == "__main__":
controller = Controller(args.device, args.baudrate, timeout=1)
for fixture_path in fixture_paths:
controller.send_soft_reset()
run_fixture(fixture_path, controller)
# clear the buffer so rest of the fixtures can run
controller.drain()
Empty file added fixture_tests/tool/__init__.py
Empty file.
51 changes: 51 additions & 0 deletions fixture_tests/tool/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import serial
from termcolor import colored


class Controller:
def __init__(self, device, baudrate, timeout):
self._debug = False
self._serial = serial.Serial(device, baudrate, timeout=timeout)
self._current_line = None

def send_soft_reset(self):
self._serial.write(b"\x18")
self._serial.flush()
self.clear_line()
# wait for startup message
while not self.current_line().startswith("Grbl 3.8"):
self.clear_line()
self.clear_line()

def current_line(self):
if self._current_line is None:
self._current_line = self._serial.readline().decode("utf-8").strip()
# print(colored("[c] <- " + self._current_line, "light_blue"))
return self._current_line

def clear_line(self):
self._current_line = None

def next_line(self):
self.clear_line()
return self.current_line()

def send_line(self, line):
# print(colored("[c] -> " + line, "light_blue"))
self._serial.write(line.encode("utf-8") + b"\n")

def getc(self, size):
return self._serial.read(size) or None

def putc(self, data):
return self._serial.write(data) or None

def drain(self, wait_for=0.1):
timeout = self._serial.timeout
self._serial.timeout = wait_for
while self._serial.read(1):
pass
self._serial.timeout = timeout

def close(self):
self._serial.close()
Loading

0 comments on commit de03f8d

Please sign in to comment.