-
Notifications
You must be signed in to change notification settings - Fork 96
/
scheduler.py
109 lines (95 loc) · 3.77 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# pylint: disable=consider-using-with
from __future__ import annotations
import traceback
from typing import Any, Callable, Iterable, List, TypeVar
from contextlib import AbstractContextManager
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing.pool import Pool
from psutil import cpu_count
from .logs import log_warning
from .singleton import Singleton
_T = TypeVar("_T")
class Scheduler(Singleton, AbstractContextManager):
"""
Worker that wraps Multprocessing and allows
for shared context or spawning processes.
Implemented as a singleton to guarantee there is only one set
of tread and process pools in use throughout the library.
Also implements the [Context Manager Protocol](https://docs.python.org/3.8/library/stdtypes.html#typecontextmanager)
"""
__process_pool: Pool
__thread_pool: Pool
def __init__(self) -> None:
super().__init__()
self._open()
def __enter__(self) -> Scheduler:
self._open()
return self
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> None:
self.close()
def _open(self) -> None:
"""Open pools"""
max_processes = cpu_count(logical=False)
# Reserve one CPU for I/O bound tasks
if max_processes > 2:
max_processes = max_processes - 1
self.__process_pool = ProcessPool(max_processes)
self.__thread_pool = ThreadPool(max_processes)
def close(self) -> None:
"""Close pools"""
self.__process_pool.close()
self.__thread_pool.close()
@staticmethod
def cpu_count() -> int:
"""Get CPU count"""
return int(cpu_count(logical=False))
def schedule(
self,
task: Callable,
arguments: Iterable[Iterable[Any]],
with_shared_resources: bool = False,
) -> List[_T]:
"""
Schedule tasks with list of arguments
:param task: the callable task to execute
:param arguments: the list of lists passed to the task using starmap
:param with_shared_resources: flag to use threads instead of processes
allowing resources to be shared. note
when using the threadpool, execution is bound
by default to the [global interpreter lock]
(https://docs.python.org/3.8/glossary.html#term-global-interpreter-lock)
"""
if with_shared_resources:
return self.safe_starmap(self.__thread_pool, task, arguments)
return self.safe_starmap(self.__process_pool, task, arguments)
@staticmethod
def safe_starmap(
pool: Pool, task: Callable, arguments: Iterable[Iterable[Any]]
) -> List[_T]:
"""Safe wrapper around starmap to ensure pool is open"""
try:
return pool.starmap(task, arguments)
except ValueError as e:
log_warning(
f"safe_starmap({task}, {arguments}) exception ValueError({str(e)})"
)
return []
except Exception: # pylint: disable=broad-except
log_warning(
f"safe_starmap({task}, {arguments}) failed with \n {traceback.format_exc()}"
)
return []
@staticmethod
def safe_map(pool: Pool, task: Callable, arguments: Iterable[Any]) -> List[_T]:
"""Safe wrapper around starmap to ensure pool is open"""
try:
return pool.map(task, arguments)
except ValueError as e:
log_warning(f"safe_map({task}, {arguments}) exception ValueError({str(e)})")
return []
except Exception: # pylint: disable=broad-except
log_warning(
f"safe_starmap({task}, {arguments}) failed with \n {traceback.format_exc()}"
)
return []