Skip to content

Commit

Permalink
Merge branch 'master' of github.com:celery/celery
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Apr 13, 2015
2 parents af3e046 + 053858c commit 83ef2fb
Show file tree
Hide file tree
Showing 19 changed files with 450 additions and 67 deletions.
3 changes: 3 additions & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,6 @@ Bert Vanderbauwhede, 2014/12/18
John Anderson, 2014/12/27
Luke Burden, 2015/01/24
Mickaël Penhard, 2015/02/15
Mark Parncutt, 2015/02/16
Samuel Jaillet, 2015/03/24
Ilya Georgievsky, 2015/03/31
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ development easier, and sometimes they add important hooks like closing
database connections at ``fork``.

.. _`Django`: http://djangoproject.com/
.. _`Pylons`: http://pylonshq.com/
.. _`Pylons`: http://www.pylonsproject.org/
.. _`Flask`: http://flask.pocoo.org/
.. _`web2py`: http://web2py.com/
.. _`Bottle`: http://bottlepy.org/
Expand Down
2 changes: 1 addition & 1 deletion celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def add_around(self, attr, around):

def __call__(self, *args, **kwargs):
_task_stack.push(self)
self.push_request()
self.push_request(args=args, kwargs=kwargs)
try:
# add self if this is a bound task
if self.__self__ is not None:
Expand Down
8 changes: 5 additions & 3 deletions celery/backends/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def consume(self, task_id, timeout=None, no_ack=True, on_interval=None):
def _many_bindings(self, ids):
return [self._create_binding(task_id) for task_id in ids]

def get_many(self, task_ids, timeout=None, no_ack=True,
def get_many(self, task_ids, timeout=None, no_ack=True, on_message=None,
now=monotonic, getfields=itemgetter('status', 'task_id'),
READY_STATES=states.READY_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES, **kwargs):
Expand All @@ -254,15 +254,17 @@ def get_many(self, task_ids, timeout=None, no_ack=True,
push_cache = self._cache.__setitem__
decode_result = self.meta_from_decoded

def on_message(message):
def _on_message(message):
body = decode_result(message.decode())
if on_message is not None:
on_message(body)
state, uid = getfields(body)
if state in READY_STATES:
push_result(body) \
if uid in task_ids else push_cache(uid, body)

bindings = self._many_bindings(task_ids)
with self.Consumer(channel, bindings, on_message=on_message,
with self.Consumer(channel, bindings, on_message=_on_message,
accept=self.accept, no_ack=no_ack):
wait = conn.drain_events
popleft = results.popleft
Expand Down
79 changes: 54 additions & 25 deletions celery/backends/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, **kw):

class MongoBackend(BaseBackend):

mongo_host = None
host = 'localhost'
port = 27017
user = None
Expand Down Expand Up @@ -75,15 +76,42 @@ def __init__(self, app=None, url=None, **kwargs):
'You need to install the pymongo library to use the '
'MongoDB backend.')

self.url = url

# default options
self.options.setdefault('max_pool_size', self.max_pool_size)
self.options.setdefault('auto_start_request', False)

# update conf with mongo uri data, only if uri was given
if self.url:
uri_data = pymongo.uri_parser.parse_uri(self.url)
# build the hosts list to create a mongo connection
make_host_str = lambda x: "{0}:{1}".format(x[0], x[1])
hostslist = map(make_host_str, uri_data['nodelist'])
self.user = uri_data['username']
self.password = uri_data['password']
self.mongo_host = hostslist
if uri_data['database']:
# if no database is provided in the uri, use default
self.database_name = uri_data['database']

self.options.update(uri_data['options'])

# update conf with specific settings
config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS')
if config is not None:
if not isinstance(config, dict):
raise ImproperlyConfigured(
'MongoDB backend settings should be grouped in a dict')
config = dict(config) # do not modify original

if 'host' in config or 'port' in config:
# these should take over uri conf
self.mongo_host = None

self.host = config.pop('host', self.host)
self.port = int(config.pop('port', self.port))
self.port = config.pop('port', self.port)
self.mongo_host = config.pop('mongo_host', self.mongo_host)
self.user = config.pop('user', self.user)
self.password = config.pop('password', self.password)
self.database_name = config.pop('database', self.database_name)
Expand All @@ -94,37 +122,38 @@ def __init__(self, app=None, url=None, **kwargs):
'groupmeta_collection', self.groupmeta_collection,
)

self.options = dict(config, **config.pop('options', None) or {})

# Set option defaults
self.options.setdefault('max_pool_size', self.max_pool_size)
self.options.setdefault('auto_start_request', False)

self.url = url
if self.url:
# Specifying backend as an URL
self.host = self.url
self.options.update(config.pop('options', {}))
self.options.update(config)

def _get_connection(self):
"""Connect to the MongoDB server."""
if self._connection is None:
from pymongo import MongoClient

# The first pymongo.Connection() argument (host) can be
# a list of ['host:port'] elements or a mongodb connection
# URI. If this is the case, don't use self.port
# but let pymongo get the port(s) from the URI instead.
# This enables the use of replica sets and sharding.
# See pymongo.Connection() for more info.
url = self.host
if isinstance(url, string_t) \
and not url.startswith('mongodb://'):
url = 'mongodb://{0}:{1}'.format(url, self.port)
if url == 'mongodb://':
url = url + 'localhost'
host = self.mongo_host
if not host:
# The first pymongo.Connection() argument (host) can be
# a list of ['host:port'] elements or a mongodb connection
# URI. If this is the case, don't use self.port
# but let pymongo get the port(s) from the URI instead.
# This enables the use of replica sets and sharding.
# See pymongo.Connection() for more info.
host = self.host
if isinstance(host, string_t) \
and not host.startswith('mongodb://'):
host = 'mongodb://{0}:{1}'.format(host, self.port)

if host == 'mongodb://':
host += 'localhost'

# don't change self.options
conf = dict(self.options)
conf['host'] = host

if detect_environment() != 'default':
self.options['use_greenlets'] = True
self._connection = MongoClient(host=url, **self.options)
conf['use_greenlets'] = True

self._connection = MongoClient(**conf)

return self._connection

Expand Down
28 changes: 14 additions & 14 deletions celery/contrib/batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,20 @@ def Strategy(self, task, app, consumer):
flush_buffer = self._do_flush

def task_message_handler(message, body, ack, reject, callbacks, **kw):
if body is None: 31513 ? S 125:09 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery6@ns326150.ip-37-187-158.eu --app=mai
body, headers, decoded, utc = ( n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery6.pid
message.body, message.headers, False, True, 31528 ? R 128:34 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery7@ns326150.ip-37-187-158.eu --app=mai
) n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery7.pid
if not body_can_be_buffer: 31543 ? S 124:32 /usr/bin/python -m celery worker --without-heartbeat -c 50 --pool=eventlet -n celery8@ns326150.ip-37-187-158.eu --app=mai
body = bytes(body) if isinstance(body, buffer_t) else body n -Q rss --without-gossip --logfile=/home/logs/rss.log --pidfile=celery8.pid
else: 26150 ? S 0:50 /usr/bin/python -m celery worker --without-heartbeat -c 2 --pool=eventlet -n engines@ns326150.ip-37-187-158.eu --app=main
body, headers, decoded, utc = proto1_to_proto2(message, body) -Q engines --without-gossip --logfile=/home/logs/engines.log --pidfile=/home/logs/pid-engines.pid
22409 ? S 0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --app=m
request = Req( ain -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
message, 22459 ? S 0:00 \_ /usr/bin/python -m celery worker --without-heartbeat -c 1 -n elasticsearch_bulk_actions@ns326150.ip-37-187-158.eu --a
on_ack=ack, on_reject=reject, app=app, hostname=hostname, pp=main -Q elasticsearch_bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=elasticsearch_bulk_actions.pid
eventer=eventer, task=task, connection_errors=connection_errors, 22419 ? S 0:00 /usr/bin/python -m celery worker --without-heartbeat -c 1 -n celery@ns326150.ip-37-187-158.eu --app=main -Q elasticsearch
body=body, headers=headers, decoded=decoded, utc=utc, _bulk_actions --without-gossip --logfile=/home/logs/elasticsearch_bulk_actions.log --pidfile=celery.pid
if body is None:
body, headers, decoded, utc = (
message.body, message.headers, False, True,
)
if not body_can_be_buffer:
body = bytes(body) if isinstance(body, buffer_t) else body
else:
body, headers, decoded, utc = proto1_to_proto2(message, body)

request = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
)
put_buffer(request)

Expand Down
9 changes: 8 additions & 1 deletion celery/fixups/django.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,14 @@ def validate_models(self):
except ImportError:
from django.core.management.base import BaseCommand
cmd = BaseCommand()
cmd.stdout, cmd.stderr = sys.stdout, sys.stderr
try:
# since django 1.5
from django.core.management.base import OutputWrapper
cmd.stdout = OutputWrapper(sys.stdout)
cmd.stderr = OutputWrapper(sys.stderr)
except ImportError:
cmd.stdout, cmd.stderr = sys.stdout, sys.stderr

cmd.check()
else:
num_errors = get_validation_errors(s, None)
Expand Down
19 changes: 13 additions & 6 deletions celery/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ def iterate(self, timeout=None, propagate=True, interval=0.5):
raise TimeoutError('The operation timed out')

def get(self, timeout=None, propagate=True, interval=0.5,
callback=None, no_ack=True):
callback=None, no_ack=True, on_message=None):
"""See :meth:`join`
This is here for API compatibility with :class:`AsyncResult`,
Expand All @@ -577,10 +577,10 @@ def get(self, timeout=None, propagate=True, interval=0.5,
"""
return (self.join_native if self.supports_native_join else self.join)(
timeout=timeout, propagate=propagate,
interval=interval, callback=callback, no_ack=no_ack)
interval=interval, callback=callback, no_ack=no_ack, on_message=on_message)

def join(self, timeout=None, propagate=True, interval=0.5,
callback=None, no_ack=True):
callback=None, no_ack=True, on_message=None):
"""Gathers the results of all tasks as a list in order.
.. note::
Expand Down Expand Up @@ -632,6 +632,9 @@ def join(self, timeout=None, propagate=True, interval=0.5,
time_start = monotonic()
remaining = None

if on_message is not None:
raise Exception('Your backend not suppored on_message callback')

results = []
for result in self.results:
remaining = None
Expand All @@ -649,7 +652,8 @@ def join(self, timeout=None, propagate=True, interval=0.5,
results.append(value)
return results

def iter_native(self, timeout=None, interval=0.5, no_ack=True):
def iter_native(self, timeout=None, interval=0.5, no_ack=True,
on_message=None):
"""Backend optimized version of :meth:`iterate`.
.. versionadded:: 2.2
Expand All @@ -667,10 +671,12 @@ def iter_native(self, timeout=None, interval=0.5, no_ack=True):
return self.backend.get_many(
set(r.id for r in results),
timeout=timeout, interval=interval, no_ack=no_ack,
on_message=on_message,
)

def join_native(self, timeout=None, propagate=True,
interval=0.5, callback=None, no_ack=True):
interval=0.5, callback=None, no_ack=True,
on_message=None):
"""Backend optimized version of :meth:`join`.
.. versionadded:: 2.2
Expand All @@ -687,7 +693,8 @@ def join_native(self, timeout=None, propagate=True,
result.id: i for i, result in enumerate(self.results)
}
acc = None if callback else [None for _ in range(len(self))]
for task_id, meta in self.iter_native(timeout, interval, no_ack):
for task_id, meta in self.iter_native(timeout, interval, no_ack,
on_message):
value = meta['result']
if propagate and meta['status'] in states.PROPAGATE_STATES:
raise value
Expand Down
Loading

0 comments on commit 83ef2fb

Please sign in to comment.