Skip to content

Commit

Permalink
[UTest] Fix #223: Fix unittest-parallel script.
Browse files Browse the repository at this point in the history
This commit fixes several issues with the script at once:

- Fix #224.
- Fix #225.
- Fix #226.
  • Loading branch information
ImmanuelHaffner committed Apr 19, 2024
1 parent 6fee5fb commit df3ef76
Showing 1 changed file with 93 additions and 47 deletions.
140 changes: 93 additions & 47 deletions utils/unittest-parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,27 @@
import subprocess
import sys
import time
import time
import xml.etree.ElementTree as ET
import itertools
from enum import Enum
from tqdm import tqdm

class ErrorType(Enum):
NO_ERROR = 1
class ReturnCode(Enum):
SUCCESS = 0
FAILURE = 1
TIMEOUT = 2
UNEXPECTED_RETURN = 3
UNKNOWN = 3

def classify_returncode(returncode: int) -> ReturnCode:
match returncode:
case 0:
return ReturnCode.SUCCESS
case 1:
return ReturnCode.FAILURE
case 124 | 137:
return ReturnCode.TIMEOUT
case _:
return ReturnCode.UNKNOWN

# data needed for junit output
class JunitData:
Expand All @@ -24,11 +36,14 @@ class JunitData:
execution_time = 0.0
test_cases = []

def append_data(self, stdout: str, stderr: str, error_type: ErrorType, test_name: str):
def append_data(self, stdout: str, stderr: str, returncode: int, test_name: str):
return_class = classify_returncode(returncode)

# If a process times out we do not want to parse the result. It is just a failure.
if error_type != ErrorType.NO_ERROR:
self.failures += 1
if return_class not in [ReturnCode.SUCCESS, ReturnCode.FAILURE]:
self.errors += 1
return

assert stdout is not None
tree = ET.fromstring(stdout)
test_suite = tree.find('testsuite')
Expand Down Expand Up @@ -62,25 +77,37 @@ class TestData:
total_assertions = 0
passed_assertions = 0
failed_assertions = 0

total_test_cases = 0
passed_test_cases = 0
failed_test_cases = 0
timeouts = 0

execution_time = 0.0

error_msgs = []
timeouted_tests = []
is_error = False
unknown_test_failures = []
has_test_failure = False

def append_data(self, stdout: str, stderr: str, returncode: int, test_name: str):
return_class = classify_returncode(returncode)

def append_data(self, stdout: str, stderr: str, error_type: ErrorType, test_name: str):
# If a process times out we do not want to parse the result. It is just a failure.
if error_type == ErrorType.TIMEOUT:
self.timeouts += 1
if return_class == ReturnCode.TIMEOUT:
self.total_test_cases += 1
self.failed_test_cases += 1
self.is_error = True
self.has_test_failure = True
self.timeouted_tests.append(test_name)
return

# If a process fails enexpectedly, we do not want to parse the result
if return_class == ReturnCode.UNKNOWN:
self.total_test_cases += 1
self.failed_test_cases += 1
self.has_test_failure = True
self.unknown_test_failures.append((test_name, returncode))
return

current_failed_tests = 0
assert stdout is not None
assert stderr is not None
Expand Down Expand Up @@ -117,35 +144,37 @@ def append_data(self, stdout: str, stderr: str, error_type: ErrorType, test_name
if current_failed_tests > 0:
self.error_msgs.append(stdout)
self.error_msgs.append(stderr)
self.is_error = True
self.has_test_failure = True
elif stderr != '':
self.error_msgs.append(stderr)

def dump(self, _):
for msg in self.error_msgs:
print(msg)

if (self.is_error):
if self.has_test_failure:
digits_total = max(len(str(self.total_assertions)), len(str(self.total_test_cases)))
digits_passed = max(len(str(self.passed_assertions)), len(str(self.passed_test_cases)))
digits_failed = max(len(str(self.failed_assertions)), len(str(self.failed_test_cases)))
print(f'\u001b[31;1m===============================================================================\u001b[39;0m')
print(f'test cases: {self.total_test_cases:>{digits_total}} | \u001b[32;1m{self.passed_test_cases:>{digits_passed}} passed\u001b[39;0m | \u001b[31;1m{self.failed_test_cases:>{digits_failed}} failed\u001b[39;0m')
print(f'assertions: {self.total_assertions:>{digits_total}} | \u001b[32;1m{self.passed_assertions:>{digits_passed}} passed\u001b[39;0m | \u001b[31;1m{self.failed_assertions:>{digits_failed}} failed\u001b[39;0m\n')
if self.timeouts > 0:
print(f'\u001b[31;1mTimeouts: {self.timeouts}\u001b[39;0m\n')
print('Tests that timed out:')
for timeout_test in self.timeouted_tests:
print(timeout_test)

print(f'Execution time: {self.execution_time}s\n')
print(f'assertions: {self.total_assertions:>{digits_total}} | \u001b[32;1m{self.passed_assertions:>{digits_passed}} passed\u001b[39;0m | \u001b[31;1m{self.failed_assertions:>{digits_failed}} failed\u001b[39;0m')

if len(self.timeouted_tests) > 0:
print(f'\n\u001b[31;1mTimeouts: {len(self.timeouted_tests)}\u001b[39;0m')
print('\n'.join(map(lambda test: f' {test}', self.timeouted_tests)))

if len(self.unknown_test_failures) > 0:
print(f'\n\u001b[31;1mUnknown test failures: {len(self.unknown_test_failures)}\u001b[39;0m')
print('\n'.join(itertools.starmap(lambda test, ret: f' {test} (return code {ret})', self.unknown_test_failures)))
else:
print(f'\u001b[32;1m===============================================================================\u001b[39;0m')
print(f'\u001b[32;1mAll tests passed\u001b[39;0m ({self.passed_assertions} assertions in {self.passed_test_cases} test cases)\n')
print(f'Execution time: {self.execution_time}s\n')

print(f'\nExecution time: {self.execution_time}s')

def is_failure(self):
if (self.failed_assertions > 0 or self.failed_test_cases > 0):
if self.failed_assertions > 0 or self.failed_test_cases > 0:
return True
else:
return False
Expand Down Expand Up @@ -174,17 +203,37 @@ def handle_returncode(stdout: str, stderr: str, returncode: int, test_name: str)
match returncode:
# no error
case 0 | 1:
data.append_data(stdout, stderr, ErrorType.NO_ERROR, test_name)
data.append_data(stdout, stderr, ReturnCode.NONE, test_name)

# timout with SIGTERM or SIGKILL
case 124 | 137:
data.append_data(stdout, stderr, ErrorType.TIMEOUT, test_name)
data.append_data(stdout, stderr, ReturnCode.TIMEOUT, test_name)

# unexpected return code
case _:
data.append_data(stdout, stderr, ErrorType.UNEXPECTED_RETURN, test_name)
data.append_data(stdout, stderr, ReturnCode.UNEXPECTED_RETURN, test_name)

running_processes: map(int, tuple(subprocess.Popen, str)) = {}

def wait_for_child() -> int:
pid, status = os.wait() # wait for *any* child to exit

assert pid in running_processes, 'os.wait() returned unexpected child PID'

process, p_test_name = running_processes[pid]
assert process.pid == pid, 'PID mismatch'

returncode = os.waitstatus_to_exitcode(status) # get return status from process; don't use Popen.returncode
assert returncode is not None

progress_bar.update(1)
del running_processes[pid]
stdout, stderr = process.communicate()

data.append_data(stdout, stderr, returncode, test_name)

return returncode

running_processes = {}
# execute tests in parallel until we have max_processes running
for test_name in test_names:
test_name = test_name.replace(',', '\\,')
Expand All @@ -195,29 +244,26 @@ def handle_returncode(stdout: str, stderr: str, returncode: int, test_name: str)
# if we have max_processes number of processes running, wait for a process to finish and remove the process from
# the running processes dictionary
if len(running_processes) >= max_processes:
pid, status = os.wait() # wait for *any* child to exit
assert pid in running_processes, 'os.wait() returned unexpected child PID'
process, p_test_name = running_processes[pid]
assert process.pid == pid, 'PID mismatch'
returncode = os.waitstatus_to_exitcode(status) # get return status from process; don't use Popen.returncode
assert returncode is not None
progress_bar.update(1)
del running_processes[pid]
stdout, stderr = process.communicate()
handle_returncode(stdout, stderr, returncode, p_test_name)
returncode = wait_for_child()
if args.stop_fail and returncode != 0: # stop on first failure
failed = True
break

# wait for the remaining running processes to finish
for pid, (process, p_test_name) in running_processes.items():
stdout, stderr = process.communicate() # wait for process to terminate; consume stdout/stderr to avoid deadlock
progress_bar.update(1)
handle_returncode(stdout, stderr, process.returncode, p_test_name)
if process.returncode != 0 and args.stop_fail:
while len(running_processes) != 0:
returncode = wait_for_child()
if args.stop_fail and returncode != 0: # stop on first failure
failed = True
break

# for pid, (process, p_test_name) in running_processes.items():
# stdout, stderr = process.communicate() # wait for process to terminate; consume stdout/stderr to avoid deadlock
# progress_bar.update(1)
# handle_returncode(stdout, stderr, process.returncode, p_test_name)
# if process.returncode != 0 and args.stop_fail:
# failed = True
# break

if failed:
# send SIGTERM to all processes
for process, test_name in running_processes.values():
Expand All @@ -236,7 +282,7 @@ def handle_returncode(stdout: str, stderr: str, returncode: int, test_name: str)
execution_time = time_end - time_start
data.execution_time = execution_time

return data
return data, failed


if __name__ == '__main__':
Expand Down Expand Up @@ -268,10 +314,10 @@ def handle_returncode(stdout: str, stderr: str, returncode: int, test_name: str)
output = subprocess.run(list_tests_command, stdout=subprocess.PIPE)
test_names = output.stdout.decode().strip().split('\n')

data = run_tests(args, test_names, args.binary_path, is_interactive)
data, failed = run_tests(args, test_names, args.binary_path, is_interactive)

data.dump(args.out)
if data.is_failure():
if failed or data.is_failure():
exit(1)
else:
exit(0)
Expand Down

0 comments on commit df3ef76

Please sign in to comment.