diff --git a/celery/beat.py b/celery/beat.py index 8bb023b9109..21d1316c64b 100644 --- a/celery/beat.py +++ b/celery/beat.py @@ -535,13 +535,15 @@ def scheduler(self): class _Threaded(Thread): """Embedded task scheduler using threading.""" - def __init__(self, *args, **kwargs): + def __init__(self, app, **kwargs): super(_Threaded, self).__init__() - self.service = Service(*args, **kwargs) + self.app = app + self.service = Service(app, **kwargs) self.daemon = True self.name = 'Beat' def run(self): + self.app.set_current() self.service.start() def stop(self): @@ -555,9 +557,10 @@ def stop(self): else: class _Process(Process): # noqa - def __init__(self, *args, **kwargs): + def __init__(self, app, **kwargs): super(_Process, self).__init__() - self.service = Service(*args, **kwargs) + self.app = app + self.service = Service(app, **kwargs) self.name = 'Beat' def run(self): @@ -565,6 +568,8 @@ def run(self): platforms.close_open_fds([ sys.__stdin__, sys.__stdout__, sys.__stderr__, ] + list(iter_open_logger_fds())) + self.app.set_default() + self.app.set_current() self.service.start(embedded_process=True) def stop(self): @@ -572,7 +577,7 @@ def stop(self): self.terminate() -def EmbeddedService(*args, **kwargs): +def EmbeddedService(app, max_interval=None, **kwargs): """Return embedded clock service. :keyword thread: Run threaded instead of as a separate process. @@ -582,6 +587,5 @@ def EmbeddedService(*args, **kwargs): if kwargs.pop('thread', False) or _Process is None: # Need short max interval to be able to stop thread # in reasonable time. - kwargs.setdefault('max_interval', 1) - return _Threaded(*args, **kwargs) - return _Process(*args, **kwargs) + return _Threaded(app, max_interval=1, **kwargs) + return _Process(app, max_interval=max_interval, **kwargs) diff --git a/celery/tests/app/test_beat.py b/celery/tests/app/test_beat.py index 362fbf9b4db..40b8c85897a 100644 --- a/celery/tests/app/test_beat.py +++ b/celery/tests/app/test_beat.py @@ -478,7 +478,7 @@ def test_start_stop_process(self): from billiard.process import Process - s = beat.EmbeddedService(app=self.app) + s = beat.EmbeddedService(self.app) self.assertIsInstance(s, Process) self.assertIsInstance(s.service, beat.Service) s.service = MockService() @@ -499,7 +499,7 @@ def terminate(self): self.assertTrue(s._popen.terminated) def test_start_stop_threaded(self): - s = beat.EmbeddedService(thread=True, app=self.app) + s = beat.EmbeddedService(self.app, thread=True) from threading import Thread self.assertIsInstance(s, Thread) self.assertIsInstance(s.service, beat.Service) diff --git a/celery/worker/components.py b/celery/worker/components.py index d23a3b6b847..bb02f4e9ed3 100644 --- a/celery/worker/components.py +++ b/celery/worker/components.py @@ -203,7 +203,7 @@ def create(self, w): from celery.beat import EmbeddedService if w.pool_cls.__module__.endswith(('gevent', 'eventlet')): raise ImproperlyConfigured(ERR_B_GREEN) - b = w.beat = EmbeddedService(app=w.app, + b = w.beat = EmbeddedService(w.app, schedule_filename=w.schedule_filename, scheduler_cls=w.scheduler_cls) return b