forked from bobpoekert/tornado-threadpool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread_pool.py
109 lines (84 loc) · 2.54 KB
/
thread_pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import threading, Queue, traceback, warnings
from tornado.ioloop import IOLoop
from functools import wraps, partial
thread_locals = threading.local()
class ThreadPool(object):
def __init__(self, max_qsize=10, timeout=1):
self.threads = set([])
self.queue = Queue.Queue()
self.timeout = timeout
self.max_qsize = max_qsize
def run(self, fn):
self.queue.put(fn)
if len(self.threads) == 0 or self.queue.qsize() > self.max_qsize:
self.spawn_thread()
def spawn_thread(self):
thread = None
def get_thread():
return thread
thread = threading.Thread(target=self.work, args=(get_thread,))
thread.setDaemon(True)
self.threads.add(thread)
thread.start()
def on_error(self):
traceback.print_exc()
def work(self, get_thread):
thread = get_thread()
del get_thread
thread_locals.thread_pool = True
while 1:
try:
job = self.queue.get(True, self.timeout)
try:
job()
except:
self.on_error()
except Queue.Empty:
break
self.threads.remove(thread)
thread_pool = ThreadPool()
def get_ioloop():
"Monkeypatch me to use a custom ioloop for @in_ioloop"
return IOLoop.instance()
def flag_ioloop():
thread_locals.ioloop = True
get_ioloop().add_callback(flag_ioloop)
def in_ioloop(fn):
@wraps(fn)
def res(*args, **kwargs):
try:
if thread_locals.ioloop:
fn(*args, **kwargs)
return
except AttributeError:
pass
get_ioloop().add_callback(partial(fn, *args, **kwargs))
return res
def in_thread_pool(fn):
@wraps(fn)
def res(*args, **kwargs):
try:
if thread_locals.thread_pool:
fn(*args, **kwargs)
return
except AttributeError:
pass
thread_pool.run(partial(fn, *args, **kwargs))
return res
def blocking_warning(fn):
warning_string = 'Blocking call to %s not in thread pool' % fn.__name__
warnings.warn(warning_string, RuntimeWarning)
traceback.print_last()
def blocking(fn):
@wraps(fn)
def res(*args, **kwargs):
while 1:
try:
if thread_locals.thread_pool:
break
except AttributeError:
pass
blocking_warning(fn)
break
return fn(*args, **kwargs)
return res