Skip to content

Commit

Permalink
added --autoreload and --paths to runworker command.
Browse files Browse the repository at this point in the history
  • Loading branch information
evrenesat committed Jul 26, 2016
1 parent 3db6f66 commit a03e631
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 32 deletions.
36 changes: 22 additions & 14 deletions zengine/management_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from zengine.views.crud import SelectBoxCache



class UpdatePermissions(Command):
"""
Gets permissions from
Expand Down Expand Up @@ -62,11 +61,11 @@ def run(self):
perm.key = code
perm.save()
new_perms.append(perm)
# perm, new = model.objects.get_or_create({'description': desc}, code=code, name=name)
# if new:
# new_perms.append(perm)
# else:
# existing_perms.append(perm)
# perm, new = model.objects.get_or_create({'description': desc}, code=code, name=name)
# if new:
# new_perms.append(perm)
# else:
# existing_perms.append(perm)

report = "\n\n%s permission(s) were found in DB. " % len(existing_perms)
if new_perms:
Expand Down Expand Up @@ -125,7 +124,7 @@ class RunServer(Command):
'help': 'Listening address. Defaults to 127.0.0.1'},
{'name': 'port', 'default': '9001', 'help': 'Listening port. Defaults to 9001'},
{'name': 'server_type', 'default': 'tornado', 'help': 'Server type. Default: "tornado"'
'Possible values: falcon, tornado'},
'Possible values: falcon, tornado'},
]

def run(self):
Expand Down Expand Up @@ -172,21 +171,33 @@ class RunWorker(Command):
# {'name': 'addr', 'default': '127.0.0.1', 'help': 'Listening address. Defaults to 127.0.0.1'},
# {'name': 'port', 'default': '9001', 'help': 'Listening port. Defaults to 9001'},
{'name': 'workers', 'default': '1', 'help': 'Number of worker process'},
{'name': 'autoreload', 'action': 'store_true', 'help': 'Autoreload on changes'},
{'name': 'paths', 'default': '.',
'help': 'Directory path(s) for autoreload changes. (comma separated)'},

]

def run(self):
"""
Starts a development server for the zengine application
"""
from zengine.wf_daemon import run_workers, Worker

worker_count = int(self.manager.args.workers or 1)
if worker_count > 1:
run_workers(worker_count)
if not self.manager.args.daemonize:
print("Starting worker(s)")

if worker_count > 1 or self.manager.args.autoreload:
run_workers(worker_count,
self.manager.args.paths.split(','),
self.manager.args.daemonize)
else:
worker = Worker()
worker.run()




class PrepareMQ(Command):
"""
Creates necessary exchanges, queues and bindings
Expand All @@ -211,16 +222,13 @@ def create_user_channels(self):
user=usr,
read_only=True,
name='Notifications',
can_manage=True,
can_leave=False
defaults=dict(can_manage=True,
can_leave=False)
)
print("%s notify sub: %s" % ('created' if new else 'existing', ch.code_name))



def create_channel_exchanges(self):
from zengine.messaging.model import Channel
for ch in Channel.objects.filter():
print("(re)creation exchange: %s" % ch.code_name)
ch.create_exchange()

65 changes: 47 additions & 18 deletions zengine/wf_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def __init__(self):
self.connect()
signal.signal(signal.SIGTERM, self.exit)
log.info("Worker starting")
print("Worker started")

def exit(self, signal=None, frame=None):
"""
Expand Down Expand Up @@ -87,9 +86,9 @@ def _prepare_error_msg(self, msg):
try:
return \
msg + '\n\n' + \
"INPUT DATA: %s\n\n" % pformat(self.current.input) + \
"OUTPUT DATA: %s\n\n" % pformat(self.current.output) + \
sys._zops_wf_state_log
"INPUT DATA: %s\n\n" % pformat(self.current.input) + \
"OUTPUT DATA: %s\n\n" % pformat(self.current.output) + \
sys._zops_wf_state_log
except:
return msg

Expand Down Expand Up @@ -182,40 +181,70 @@ def send_output(self, output):
self.client_queue.send_to_prv_exchange(self.current.user_id, output)




def run_workers(no_subprocess):
def run_workers(no_subprocess, watch_paths=None, is_background=False):
"""
subprocess handler
"""
import atexit, os, subprocess, signal
if watch_paths:
from watchdog.observers import Observer
# from watchdog.observers.fsevents import FSEventsObserver as Observer
# from watchdog.observers.polling import PollingObserver as Observer
from watchdog.events import FileSystemEventHandler

def on_modified(event):
if not is_background:
print("Restarting worker due to change in %s" % event.src_path)
log.info("modified %s" % event.src_path)
try:
kill_children()
run_children()
except:
log.exception("Error while restarting worker")

handler = FileSystemEventHandler()
handler.on_modified = on_modified

# global child_pids
child_pids = []

log.info("starting %s workers" % no_subprocess)
for i in range(int(no_subprocess)):
proc = subprocess.Popen([sys.executable, __file__],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
child_pids.append(proc.pid)
log.info("Started worker with pid %s" % proc.pid)

def kill_child(foo=None, bar=None):

def run_children():
global child_pids
child_pids = []
for i in range(int(no_subprocess)):
proc = subprocess.Popen([sys.executable, __file__],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
child_pids.append(proc.pid)
log.info("Started worker with pid %s" % proc.pid)

def kill_children():
"""
kill subprocess on exit of manager (this) process
"""
log.info("Stopping worker(s)")
for pid in child_pids:
if pid is not None:
os.kill(pid, signal.SIGTERM)

atexit.register(kill_child)
signal.signal(signal.SIGTERM, kill_child)
run_children()
atexit.register(kill_children)
signal.signal(signal.SIGTERM, kill_children)
if watch_paths:
observer = Observer()
for path in watch_paths:
print("Watch for changes under %s" % path)
observer.schedule(handler, path=path, recursive=True)
observer.start()
while 1:
try:
sleep(1)
except KeyboardInterrupt:
log.info("Keyboard interrupt, exiting")
if watch_paths:
observer.stop()
observer.join()
sys.exit(0)


Expand Down

0 comments on commit a03e631

Please sign in to comment.