Skip to content

Commit

Permalink
Embedded beat must set app for thread/process. Closes celery#2594
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Apr 30, 2015
1 parent e544b4e commit a84d67c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
20 changes: 12 additions & 8 deletions celery/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,15 @@ def scheduler(self):
class _Threaded(Thread):
"""Embedded task scheduler using threading."""

def __init__(self, *args, **kwargs):
def __init__(self, app, **kwargs):
super(_Threaded, self).__init__()
self.service = Service(*args, **kwargs)
self.app = app
self.service = Service(app, **kwargs)
self.daemon = True
self.name = 'Beat'

def run(self):
self.app.set_current()
self.service.start()

def stop(self):
Expand All @@ -555,24 +557,27 @@ def stop(self):
else:
class _Process(Process): # noqa

def __init__(self, *args, **kwargs):
def __init__(self, app, **kwargs):
super(_Process, self).__init__()
self.service = Service(*args, **kwargs)
self.app = app
self.service = Service(app, **kwargs)
self.name = 'Beat'

def run(self):
reset_signals(full=False)
platforms.close_open_fds([
sys.__stdin__, sys.__stdout__, sys.__stderr__,
] + list(iter_open_logger_fds()))
self.app.set_default()
self.app.set_current()
self.service.start(embedded_process=True)

def stop(self):
self.service.stop()
self.terminate()


def EmbeddedService(*args, **kwargs):
def EmbeddedService(app, max_interval=None, **kwargs):
"""Return embedded clock service.
:keyword thread: Run threaded instead of as a separate process.
Expand All @@ -582,6 +587,5 @@ def EmbeddedService(*args, **kwargs):
if kwargs.pop('thread', False) or _Process is None:
# Need short max interval to be able to stop thread
# in reasonable time.
kwargs.setdefault('max_interval', 1)
return _Threaded(*args, **kwargs)
return _Process(*args, **kwargs)
return _Threaded(app, max_interval=1, **kwargs)
return _Process(app, max_interval=max_interval, **kwargs)
4 changes: 2 additions & 2 deletions celery/tests/app/test_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def test_start_stop_process(self):

from billiard.process import Process

s = beat.EmbeddedService(app=self.app)
s = beat.EmbeddedService(self.app)
self.assertIsInstance(s, Process)
self.assertIsInstance(s.service, beat.Service)
s.service = MockService()
Expand All @@ -499,7 +499,7 @@ def terminate(self):
self.assertTrue(s._popen.terminated)

def test_start_stop_threaded(self):
s = beat.EmbeddedService(thread=True, app=self.app)
s = beat.EmbeddedService(self.app, thread=True)
from threading import Thread
self.assertIsInstance(s, Thread)
self.assertIsInstance(s.service, beat.Service)
Expand Down
2 changes: 1 addition & 1 deletion celery/worker/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def create(self, w):
from celery.beat import EmbeddedService
if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):
raise ImproperlyConfigured(ERR_B_GREEN)
b = w.beat = EmbeddedService(app=w.app,
b = w.beat = EmbeddedService(w.app,
schedule_filename=w.schedule_filename,
scheduler_cls=w.scheduler_cls)
return b
Expand Down

0 comments on commit a84d67c

Please sign in to comment.