diff --git a/solnlib/concurrent/concurrent_executor.py b/solnlib/concurrent/concurrent_executor.py index 1b863ce0..42324e1a 100644 --- a/solnlib/concurrent/concurrent_executor.py +++ b/solnlib/concurrent/concurrent_executor.py @@ -14,10 +14,8 @@ # limitations under the License. # -""" -Concurrent executor provides concurrent executing function either in -a thread pool or a process pool -""" +"""Concurrent executor provides concurrent executing function either in a +thread pool or a process pool.""" import solnlib.concurrent.process_pool as pp import solnlib.concurrent.thread_pool as tp @@ -71,9 +69,9 @@ def run_io_func_async(self, func, args=(), kwargs=None, callback=None): return self._io_executor.apply_async(func, args, kwargs, callback) def enqueue_io_funcs(self, funcs, block=True): - """ - run jobs in a fire and forget way, no result will be handled - over to clients + """run jobs in a fire and forget way, no result will be handled over to + clients. + :param funcs: tuple/list-like or generator like object, func shall be callable """ diff --git a/solnlib/concurrent/process_pool.py b/solnlib/concurrent/process_pool.py index c9845c42..cff12c49 100644 --- a/solnlib/concurrent/process_pool.py +++ b/solnlib/concurrent/process_pool.py @@ -14,9 +14,7 @@ # limitations under the License. # -""" -A wrapper of multiprocessing.pool -""" +"""A wrapper of multiprocessing.pool.""" import multiprocessing @@ -24,9 +22,7 @@ class ProcessPool: - """ - A simple wrapper of multiprocessing.pool - """ + """A simple wrapper of multiprocessing.pool.""" def __init__(self, size=0, maxtasksperchild=10000): if size <= 0: @@ -38,9 +34,7 @@ def __init__(self, size=0, maxtasksperchild=10000): self._stopped = False def tear_down(self): - """ - Tear down the pool - """ + """Tear down the pool.""" if self._stopped: logging.info("ProcessPool has already stopped.") diff --git a/solnlib/concurrent/thread_pool.py b/solnlib/concurrent/thread_pool.py index f71f3749..6113dfa2 100644 --- a/solnlib/concurrent/thread_pool.py +++ b/solnlib/concurrent/thread_pool.py @@ -14,9 +14,7 @@ # limitations under the License. # -""" -A simple thread pool implementation -""" +"""A simple thread pool implementation.""" import multiprocessing import queue @@ -26,11 +24,8 @@ import logging - class ThreadPool: - """ - A simple thread pool implementation - """ + """A simple thread pool implementation.""" _high_watermark = 0.2 _resize_window = 10 @@ -63,9 +58,7 @@ def __init__(self, min_size=1, max_size=128, task_queue_size=1024, daemon=True): self._started = False def start(self): - """ - Start threads in the pool - """ + """Start threads in the pool.""" with self._lock: if self._started: @@ -80,9 +73,7 @@ def start(self): logging.info("ThreadPool started.") def tear_down(self): - """ - Tear down thread pool - """ + """Tear down thread pool.""" with self._lock: if not self._started: @@ -103,9 +94,9 @@ def tear_down(self): logging.info("ThreadPool stopped.") def enqueue_funcs(self, funcs, block=True): - """ - run jobs in a fire and forget way, no result will be handled - over to clients + """run jobs in a fire and forget way, no result will be handled over to + clients. + :param funcs: tuple/list-like or generator like object, func shall be callable """ @@ -153,9 +144,7 @@ def size(self): return self._last_size def resize(self, new_size): - """ - Resize the pool size, spawn or destroy threads if necessary - """ + """Resize the pool size, spawn or destroy threads if necessary.""" if new_size <= 0: return @@ -182,9 +171,7 @@ def resize(self, new_size): logging.info("Finished ThreadPool resizing. New size=%d", new_size) def _remove_exited_threads_with_lock(self): - """ - Join the exited threads last time when resize was called - """ + """Join the exited threads last time when resize was called.""" joined_thrs = set() for thr in self._thrs: @@ -252,9 +239,8 @@ def _do_admin(self): ) def _run(self): - """ - Threads callback func, run forever to handle jobs from the job queue - """ + """Threads callback func, run forever to handle jobs from the job + queue.""" work_queue = self._work_queue count_lock = self._count_lock @@ -282,9 +268,7 @@ def _run(self): logging.debug("Done with exec job") logging.info("Thread work_queue_size=%d", work_queue.qsize()) - logging.debug( - "Worker thread %s stopped.", threading.current_thread().getName() - ) + logging.debug("Worker thread %s stopped.", threading.current_thread().getName()) class AsyncResult: @@ -315,11 +299,12 @@ def __call__(self): self._callback() def get(self, timeout=None): - """ - Return the result when it arrives. If timeout is not None and the - result does not arrive within timeout seconds then - multiprocessing.TimeoutError is raised. If the remote call raised an - exception then that exception will be reraised by get(). + """Return the result when it arrives. + + If timeout is not None and the result does not arrive within + timeout seconds then multiprocessing.TimeoutError is raised. If + the remote call raised an exception then that exception will be + reraised by get(). """ try: @@ -332,9 +317,7 @@ def get(self, timeout=None): return res def wait(self, timeout=None): - """ - Wait until the result is available or until timeout seconds pass. - """ + """Wait until the result is available or until timeout seconds pass.""" try: res = self._q.get(timeout=timeout) @@ -344,15 +327,13 @@ def wait(self, timeout=None): self._q.put(res) def ready(self): - """ - Return whether the call has completed. - """ + """Return whether the call has completed.""" return len(self._q) def successful(self): - """ - Return whether the call completed without raising an exception. + """Return whether the call completed without raising an exception. + Will raise AssertionError if the result is not ready. """ diff --git a/solnlib/modular_input/modinput.py b/solnlib/modular_input/modinput.py index dbae1dd3..a3ce60ec 100644 --- a/solnlib/modular_input/modinput.py +++ b/solnlib/modular_input/modinput.py @@ -23,8 +23,7 @@ def _parse_modinput_configs(root, outer_block, inner_block): - """ - When user splunkd spawns modinput script to do config check or run + """When user splunkd spawns modinput script to do config check or run. @@ -148,9 +147,7 @@ def get_modinput_configs_from_cli(modinput, modinput_stanza=None): def get_modinput_config_str_from_stdin(): - """ - Get modinput from stdin which is feed by splunkd - """ + """Get modinput from stdin which is feed by splunkd.""" try: return sys.stdin.read(5000) diff --git a/solnlib/rest.py b/solnlib/rest.py index 617b38f9..b551ff4b 100644 --- a/solnlib/rest.py +++ b/solnlib/rest.py @@ -36,7 +36,7 @@ def splunkd_request( ) -> Optional[requests.Response]: headers = headers if headers is not None else {} - headers["Authorization"] = "Splunk {}".format(session_key) + headers["Authorization"] = f"Splunk {session_key}" content_type = headers.get("Content-Type") if not content_type: content_type = headers.get("content-type") @@ -79,13 +79,13 @@ def splunkd_request( def code_to_msg(response: requests.Response): code_msg_tbl = { - 400: "Request error. reason={}".format(response.text), + 400: f"Request error. reason={response.text}", 401: "Authentication failure, invalid access credentials.", 402: "In-use license disables this feature.", 403: "Insufficient permission.", 404: "Requested endpoint does not exist.", - 409: "Invalid operation for this endpoint. reason={}".format(response.text), - 500: "Unspecified internal server error. reason={}".format(response.text), + 409: f"Invalid operation for this endpoint. reason={response.text}", + 500: f"Unspecified internal server error. reason={response.text}", 503: ( "Feature is disabled in the configuration file. " "reason={}".format(response.text) diff --git a/solnlib/schedule/job.py b/solnlib/schedule/job.py index de7e939a..a7ba140c 100644 --- a/solnlib/schedule/job.py +++ b/solnlib/schedule/job.py @@ -19,9 +19,7 @@ class Job: - """ - Timer wraps the callback and timestamp related stuff - """ + """Timer wraps the callback and timestamp related stuff.""" _ident = 0 _lock = threading.Lock() diff --git a/solnlib/schedule/scheduler.py b/solnlib/schedule/scheduler.py index 38a3c75b..2f2f2a85 100644 --- a/solnlib/schedule/scheduler.py +++ b/solnlib/schedule/scheduler.py @@ -23,9 +23,7 @@ class Scheduler: - """ - A simple scheduler which schedules the periodic or once event - """ + """A simple scheduler which schedules the periodic or once event.""" import sortedcontainers as sc @@ -42,9 +40,10 @@ def __init__(self): self._started = False def start(self): - """ - Start the schduler which will start the internal thread for scheduling - jobs. Please do tear_down when doing cleanup + """Start the schduler which will start the internal thread for + scheduling jobs. + + Please do tear_down when doing cleanup """ if self._started: @@ -55,10 +54,8 @@ def start(self): self._thr.start() def tear_down(self): - """ - Stop the schduler which will stop the internal thread for scheduling - jobs. - """ + """Stop the schduler which will stop the internal thread for scheduling + jobs.""" if not self._started: logging.info("Scheduler already tear down.")