From a03e63142971eae3179a381f668339d2d8aa4fd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Tue, 26 Jul 2016 18:02:58 +0300 Subject: [PATCH] added --autoreload and --paths to runworker command. --- zengine/management_commands.py | 36 +++++++++++-------- zengine/wf_daemon.py | 65 ++++++++++++++++++++++++---------- 2 files changed, 69 insertions(+), 32 deletions(-) diff --git a/zengine/management_commands.py b/zengine/management_commands.py index fca3a793..6d70fd1c 100644 --- a/zengine/management_commands.py +++ b/zengine/management_commands.py @@ -15,7 +15,6 @@ from zengine.views.crud import SelectBoxCache - class UpdatePermissions(Command): """ Gets permissions from @@ -62,11 +61,11 @@ def run(self): perm.key = code perm.save() new_perms.append(perm) - # perm, new = model.objects.get_or_create({'description': desc}, code=code, name=name) - # if new: - # new_perms.append(perm) - # else: - # existing_perms.append(perm) + # perm, new = model.objects.get_or_create({'description': desc}, code=code, name=name) + # if new: + # new_perms.append(perm) + # else: + # existing_perms.append(perm) report = "\n\n%s permission(s) were found in DB. " % len(existing_perms) if new_perms: @@ -125,7 +124,7 @@ class RunServer(Command): 'help': 'Listening address. Defaults to 127.0.0.1'}, {'name': 'port', 'default': '9001', 'help': 'Listening port. Defaults to 9001'}, {'name': 'server_type', 'default': 'tornado', 'help': 'Server type. Default: "tornado"' - 'Possible values: falcon, tornado'}, + 'Possible values: falcon, tornado'}, ] def run(self): @@ -172,6 +171,10 @@ class RunWorker(Command): # {'name': 'addr', 'default': '127.0.0.1', 'help': 'Listening address. Defaults to 127.0.0.1'}, # {'name': 'port', 'default': '9001', 'help': 'Listening port. Defaults to 9001'}, {'name': 'workers', 'default': '1', 'help': 'Number of worker process'}, + {'name': 'autoreload', 'action': 'store_true', 'help': 'Autoreload on changes'}, + {'name': 'paths', 'default': '.', + 'help': 'Directory path(s) for autoreload changes. (comma separated)'}, + ] def run(self): @@ -179,14 +182,22 @@ def run(self): Starts a development server for the zengine application """ from zengine.wf_daemon import run_workers, Worker + worker_count = int(self.manager.args.workers or 1) - if worker_count > 1: - run_workers(worker_count) + if not self.manager.args.daemonize: + print("Starting worker(s)") + + if worker_count > 1 or self.manager.args.autoreload: + run_workers(worker_count, + self.manager.args.paths.split(','), + self.manager.args.daemonize) else: worker = Worker() worker.run() + + class PrepareMQ(Command): """ Creates necessary exchanges, queues and bindings @@ -211,16 +222,13 @@ def create_user_channels(self): user=usr, read_only=True, name='Notifications', - can_manage=True, - can_leave=False + defaults=dict(can_manage=True, + can_leave=False) ) print("%s notify sub: %s" % ('created' if new else 'existing', ch.code_name)) - - def create_channel_exchanges(self): from zengine.messaging.model import Channel for ch in Channel.objects.filter(): print("(re)creation exchange: %s" % ch.code_name) ch.create_exchange() - diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index 5ad8efd7..98fbc8c0 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -42,7 +42,6 @@ def __init__(self): self.connect() signal.signal(signal.SIGTERM, self.exit) log.info("Worker starting") - print("Worker started") def exit(self, signal=None, frame=None): """ @@ -87,9 +86,9 @@ def _prepare_error_msg(self, msg): try: return \ msg + '\n\n' + \ - "INPUT DATA: %s\n\n" % pformat(self.current.input) + \ - "OUTPUT DATA: %s\n\n" % pformat(self.current.output) + \ - sys._zops_wf_state_log + "INPUT DATA: %s\n\n" % pformat(self.current.input) + \ + "OUTPUT DATA: %s\n\n" % pformat(self.current.output) + \ + sys._zops_wf_state_log except: return msg @@ -182,40 +181,70 @@ def send_output(self, output): self.client_queue.send_to_prv_exchange(self.current.user_id, output) - - -def run_workers(no_subprocess): +def run_workers(no_subprocess, watch_paths=None, is_background=False): """ subprocess handler """ import atexit, os, subprocess, signal + if watch_paths: + from watchdog.observers import Observer + # from watchdog.observers.fsevents import FSEventsObserver as Observer + # from watchdog.observers.polling import PollingObserver as Observer + from watchdog.events import FileSystemEventHandler + + def on_modified(event): + if not is_background: + print("Restarting worker due to change in %s" % event.src_path) + log.info("modified %s" % event.src_path) + try: + kill_children() + run_children() + except: + log.exception("Error while restarting worker") + + handler = FileSystemEventHandler() + handler.on_modified = on_modified # global child_pids child_pids = [] - log.info("starting %s workers" % no_subprocess) - for i in range(int(no_subprocess)): - proc = subprocess.Popen([sys.executable, __file__], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - child_pids.append(proc.pid) - log.info("Started worker with pid %s" % proc.pid) - - def kill_child(foo=None, bar=None): + + def run_children(): + global child_pids + child_pids = [] + for i in range(int(no_subprocess)): + proc = subprocess.Popen([sys.executable, __file__], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + child_pids.append(proc.pid) + log.info("Started worker with pid %s" % proc.pid) + + def kill_children(): """ kill subprocess on exit of manager (this) process """ + log.info("Stopping worker(s)") for pid in child_pids: if pid is not None: os.kill(pid, signal.SIGTERM) - atexit.register(kill_child) - signal.signal(signal.SIGTERM, kill_child) + run_children() + atexit.register(kill_children) + signal.signal(signal.SIGTERM, kill_children) + if watch_paths: + observer = Observer() + for path in watch_paths: + print("Watch for changes under %s" % path) + observer.schedule(handler, path=path, recursive=True) + observer.start() while 1: try: sleep(1) except KeyboardInterrupt: log.info("Keyboard interrupt, exiting") + if watch_paths: + observer.stop() + observer.join() sys.exit(0)