Skip to content

Commit

Permalink
Pool: select api is not the same as poll (Issue celery#2430)
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Apr 29, 2015
1 parent 6592ff6 commit 3dd7121
Showing 1 changed file with 37 additions and 26 deletions.
63 changes: 37 additions & 26 deletions celery/concurrency/asynpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,43 @@ def _get_job_writer(job):
return writer() # is a weakref


if hasattr(select, 'poll', None):
def _select_imp(readers=None, writers=None, err=None, timeout=0,
poll=select.poll, POLLIN=select.POLLIN,
POLLOUT=select.POLLOUT, POLLERR=select.POLLERR):
poller = poll()
register = poller.register

if readers:
[register(fd, POLLIN) for fd in readers]
if writers:
[register(fd, POLLOUT) for fd in writers]
if err:
[register(fd, POLLERR) for fd in err]

R, W = set(), set()
timeout = 0 if timeout and timeout < 0 else round(timeout * 1e3)
events = poller.poll(timeout)
for fd, event in events:
if not isinstance(fd, Integral):
fd = fd.fileno()
if event & POLLIN:
R.add(fd)
if event & POLLOUT:
W.add(fd)
if event & POLLERR:
R.add(fd)
return R, W, 0
else:
def _select_imp(readers=None, writers=None, err=None, timeout=0):
r, w, e = select.select(readers, writers, err, timeout)
if e:
r = list(set(r) | set(e))
return r, w, 0


def _select(readers=None, writers=None, err=None, timeout=0,
poll=getattr(select, 'poll', select.select),
POLLIN=select.POLLIN, POLLOUT=select.POLLOUT,
POLLERR=select.POLLERR):
_select_imp=_select_imp):
"""Simple wrapper to :class:`~select.select`, using :`~select.poll`
as the implementation.
Expand All @@ -136,30 +169,8 @@ def _select(readers=None, writers=None, err=None, timeout=0,
readers = set() if readers is None else readers
writers = set() if writers is None else writers
err = set() if err is None else err
poller = poll()
register = poller.register

if readers:
[register(fd, POLLIN) for fd in readers]
if writers:
[register(fd, POLLOUT) for fd in writers]
if err:
[register(fd, POLLERR) for fd in err]

R, W = set(), set()
timeout = 0 if timeout and timeout < 0 else round(timeout * 1e3)
try:
events = poller.poll(timeout)
for fd, event in events:
if not isinstance(fd, Integral):
fd = fd.fileno()
if event & POLLIN:
R.add(fd)
if event & POLLOUT:
W.add(fd)
if event & POLLERR:
R.add(fd)
return R, W, 0
return _select_imp(readers, writers, err, timeout)
except (select.error, socket.error) as exc:
if exc.errno == errno.EINTR:
return set(), set(), 1
Expand Down

0 comments on commit 3dd7121

Please sign in to comment.