Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use an Asynchronous Event Loop in SbyJob (rebased) #108

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 114 additions & 81 deletions sbysrc/sby_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
if os.name == "posix":
import resource, fcntl
import subprocess
import asyncio
from functools import partial
from shutil import copyfile, rmtree
from select import select
from time import time, localtime, sleep
Expand Down Expand Up @@ -79,17 +81,12 @@ def __init__(self, job, info, deps, cmdline, logfile=None, logstderr=True, silen
self.job.tasks_pending.append(self)

for dep in self.deps:
dep.register_dep(self)
if not dep.finished:
dep.notify.append(self)

self.output_callback = None
self.exit_callback = None

def register_dep(self, next_task):
if self.finished:
next_task.poll()
else:
self.notify.append(next_task)

def log(self, line):
if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)):
if self.logfile is not None:
Expand Down Expand Up @@ -117,17 +114,39 @@ def terminate(self, timeout=False):
if self.running:
if not self.silent:
self.job.log("{}: terminating process".format(self.info))
if os.name == "posix":
if os.name != "posix":
# self.p.terminate does not actually terminate underlying
# processes on Windows, so send ctrl+break to the process
# group we created. This for some reason does
# not cause the associated future (self.fut) to complete
# until it is awaited on one last time.

os.kill(self.p.pid, signal.CTRL_BREAK_EVENT)
else:
try:
os.killpg(self.p.pid, signal.SIGTERM)
except PermissionError:
pass
self.p.terminate()
self.p.terminate()
self.job.tasks_running.remove(self)
self.job.tasks_retired.append(self)
all_tasks_running.remove(self)
self.terminated = True

def poll(self):
async def output(self):
while True:
outs = await self.p.stdout.readline()
await asyncio.sleep(0) # https://bugs.python.org/issue24532
outs = outs.decode("utf-8")
if len(outs) == 0: break
if outs[-1] != '\n':
self.linebuffer += outs
break
outs = (self.linebuffer + outs).strip()
self.linebuffer = ""
self.handle_output(outs)

async def maybe_spawn(self):
if self.finished or self.terminated:
return

Expand All @@ -138,68 +157,54 @@ def poll(self):

if not self.silent:
self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline))

if os.name == "posix":
def preexec_fn():
signal.signal(signal.SIGINT, signal.SIG_IGN)
os.setpgrp()

self.p = subprocess.Popen(["/usr/bin/env", "bash", "-c", self.cmdline], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=(subprocess.STDOUT if self.logstderr else None), preexec_fn=preexec_fn)

fl = fcntl.fcntl(self.p.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)

subp_kwargs = { "preexec_fn" : preexec_fn }
else:
self.p = subprocess.Popen(self.cmdline, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=(subprocess.STDOUT if self.logstderr else None))

subp_kwargs = { "creationflags" : subprocess.CREATE_NEW_PROCESS_GROUP }
self.p = await asyncio.create_subprocess_shell(self.cmdline, stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=(asyncio.subprocess.STDOUT if self.logstderr else None),
**subp_kwargs)
self.job.tasks_pending.remove(self)
self.job.tasks_running.append(self)
all_tasks_running.append(self)
self.running = True
return
asyncio.ensure_future(self.output())
self.fut = asyncio.ensure_future(self.p.wait())

async def shutdown_and_notify(self):
if not self.silent:
self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode))
self.job.tasks_running.remove(self)
self.job.tasks_retired.append(self)
self.running = False

while True:
outs = self.p.stdout.readline().decode("utf-8")
if len(outs) == 0: break
if outs[-1] != '\n':
self.linebuffer += outs
break
outs = (self.linebuffer + outs).strip()
self.linebuffer = ""
self.handle_output(outs)
self.handle_exit(self.p.returncode)

if self.p.poll() is not None:
if self.p.returncode == 127:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode))
self.job.tasks_running.remove(self)
all_tasks_running.remove(self)
self.running = False

if self.p.returncode == 127:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return

self.handle_exit(self.p.returncode)

if self.checkretcode and self.p.returncode != 0:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: job failed. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return

self.finished = True
for next_task in self.notify:
next_task.poll()
self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return

if self.checkretcode and self.p.returncode != 0:
self.job.status = "ERROR"
if not self.silent:
self.job.log("{}: job failed. ERROR.".format(self.info))
self.terminated = True
self.job.terminate()
return

self.finished = True
for next_task in self.notify:
await next_task.maybe_spawn()
return

class SbyAbort(BaseException):
pass
Expand Down Expand Up @@ -233,6 +238,7 @@ def __init__(self, sbyconfig, workdir, early_logs, reusedir):

self.tasks_running = []
self.tasks_pending = []
self.tasks_retired = []

self.start_clock_time = time()

Expand All @@ -253,35 +259,59 @@ def __init__(self, sbyconfig, workdir, early_logs, reusedir):
print(line, file=f)

def taskloop(self):
if os.name != "posix":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
poll_fut = asyncio.ensure_future(self.task_poller())
loop.run_until_complete(poll_fut)

async def timekeeper(self):
total_clock_time = int(time() - self.start_clock_time)

try:
while total_clock_time <= self.opt_timeout:
await asyncio.sleep(1)
total_clock_time = int(time() - self.start_clock_time)
except asyncio.CancelledError:
pass

def timeout(self, fut):
self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout))
self.status = "TIMEOUT"
self.terminate(timeout=True)

async def task_poller(self):
if self.opt_timeout is not None:
timer_fut = asyncio.ensure_future(self.timekeeper())
done_cb = partial(SbyJob.timeout, self)
timer_fut.add_done_callback(done_cb)

for task in self.tasks_pending:
task.poll()
await task.maybe_spawn()

while len(self.tasks_running):
fds = []
task_futs = []
for task in self.tasks_running:
if task.running:
fds.append(task.p.stdout)

if os.name == "posix":
try:
select(fds, [], [], 1.0) == ([], [], [])
except InterruptedError:
pass
else:
sleep(0.1)
task_futs.append(task.fut)
(done, pending) = await asyncio.wait(task_futs, return_when=asyncio.FIRST_COMPLETED)

for task in self.tasks_running:
task.poll()
if task.fut in done:
await task.shutdown_and_notify()

for task in self.tasks_pending:
task.poll()
if self.opt_timeout is not None:
timer_fut.remove_done_callback(done_cb)
timer_fut.cancel()

if self.opt_timeout is not None:
total_clock_time = int(time() - self.start_clock_time)
if total_clock_time > self.opt_timeout:
self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout))
self.status = "TIMEOUT"
self.terminate(timeout=True)
# Required on Windows. I am unsure why, but subprocesses that were
# terminated will not have their futures complete until awaited on
# one last time.
if os.name != "posix":
for t in self.tasks_retired:
if not t.fut.done():
await t.fut

def log(self, logmessage):
tm = localtime()
Expand Down Expand Up @@ -401,6 +431,7 @@ def make_model(self, model_name):
print("opt -fast", file=f)
print("abc", file=f)
print("opt_clean", file=f)
print("dffunmap", file=f)
print("stat", file=f)
if "_stbv" in model_name:
print("write_smt2 -stbv -wires design_{}.smt2".format(model_name), file=f)
Expand Down Expand Up @@ -430,6 +461,7 @@ def make_model(self, model_name):
else:
print("opt -fast", file=f)
print("delete -output", file=f)
print("dffunmap", file=f)
print("stat", file=f)
print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)

Expand All @@ -450,6 +482,7 @@ def make_model(self, model_name):
print("opt -full", file=f)
print("techmap", file=f)
print("opt -fast", file=f)
print("dffunmap", file=f)
print("abc -g AND -fast", file=f)
print("opt_clean", file=f)
print("stat", file=f)
Expand Down Expand Up @@ -634,19 +667,19 @@ def run(self, setupmode):

if self.opt_mode == "bmc":
import sby_mode_bmc
sby_mode_bmc.run(self)
sby_mode_bmc.init(self)

elif self.opt_mode == "prove":
import sby_mode_prove
sby_mode_prove.run(self)
sby_mode_prove.init(self)

elif self.opt_mode == "live":
import sby_mode_live
sby_mode_live.run(self)
sby_mode_live.init(self)

elif self.opt_mode == "cover":
import sby_mode_cover
sby_mode_cover.run(self)
sby_mode_cover.init(self)

else:
assert False
Expand Down
2 changes: 1 addition & 1 deletion sbysrc/sby_engine_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask

def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
abc_opts, abc_command = getopt.getopt(engine[1:], "", [])

if len(abc_command) == 0:
Expand Down
2 changes: 1 addition & 1 deletion sbysrc/sby_engine_aiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask

def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
opts, solver_args = getopt.getopt(engine[1:], "", [])

if len(solver_args) == 0:
Expand Down
4 changes: 2 additions & 2 deletions sbysrc/sby_engine_btor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from types import SimpleNamespace
from sby_core import SbyTask

def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
random_seed = None

opts, solver_args = getopt.getopt(engine[1:], "", ["seed="])
Expand Down Expand Up @@ -190,7 +190,7 @@ def output_callback(line):

def exit_callback(retcode):
if solver_args[0] == "pono":
assert retcode in [1, 2]
assert retcode in [0, 1, 255] # UNKNOWN = -1, FALSE = 0, TRUE = 1, ERROR = 2
else:
assert retcode == 0
if common_state.expected_cex != 0:
Expand Down
6 changes: 3 additions & 3 deletions sbysrc/sby_engine_smtbmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask

def run(mode, job, engine_idx, engine):
def init(mode, job, engine_idx, engine):
smtbmc_opts = []
nomem_opt = False
presat_opt = True
Expand Down Expand Up @@ -102,9 +102,9 @@ def run(mode, job, engine_idx, engine):

if mode == "prove":
if not induction_only:
run("prove_basecase", job, engine_idx, engine)
init("prove_basecase", job, engine_idx, engine)
if not basecase_only:
run("prove_induction", job, engine_idx, engine)
init("prove_induction", job, engine_idx, engine)
return

taskname = "engine_{}".format(engine_idx)
Expand Down
8 changes: 4 additions & 4 deletions sbysrc/sby_mode_bmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import re, os, getopt
from sby_core import SbyTask

def run(job):
def init(job):
job.handle_int_option("depth", 20)
job.handle_int_option("append", 0)
job.handle_str_option("aigsmt", "yices")
Expand All @@ -33,15 +33,15 @@ def run(job):

if engine[0] == "smtbmc":
import sby_engine_smtbmc
sby_engine_smtbmc.run("bmc", job, engine_idx, engine)
sby_engine_smtbmc.init("bmc", job, engine_idx, engine)

elif engine[0] == "abc":
import sby_engine_abc
sby_engine_abc.run("bmc", job, engine_idx, engine)
sby_engine_abc.init("bmc", job, engine_idx, engine)

elif engine[0] == "btor":
import sby_engine_btor
sby_engine_btor.run("bmc", job, engine_idx, engine)
sby_engine_btor.init("bmc", job, engine_idx, engine)

else:
job.error("Invalid engine '{}' for bmc mode.".format(engine[0]))
Loading