-
Notifications
You must be signed in to change notification settings - Fork 3
/
sessionqueue.py
51 lines (40 loc) · 1.23 KB
/
sessionqueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import collections
import heapq
import threading
class Lock():
def __init__(self):
self._lock = threading.Lock()
self.acquire = self._lock.acquire
self.release = self._lock.release
def __lt__(self, _):
return False
class SessionQueue(object):
def __init__(self):
self._nowait_queue = collections.deque()
self._wait_queue = collections.deque()
self._waiters = []
self._lock = threading.Lock()
def put(self, item):
with self._lock:
if not self._waiters:
self._nowait_queue.append(item)
return
self._wait_queue.append(item)
self._notify()
def get(self, priority):
self._lock.acquire()
if self._nowait_queue:
item = self._nowait_queue.popleft()
self._lock.release()
return item
return self._wait(priority)
def _notify(self):
_, waiter = heapq.heappop(self._waiters)
waiter.release()
def _wait(self, priority):
waiter = Lock()
waiter.acquire()
heapq.heappush(self._waiters, (priority, waiter))
self._lock.release()
waiter.acquire()
return self._wait_queue.popleft()