Skip to content

Commit

Permalink
ci: fix pre-commit failure
Browse files Browse the repository at this point in the history
  • Loading branch information
hetangmodi-crest committed Dec 2, 2024
1 parent 1b49e9a commit 3c37c9c
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 79 deletions.
12 changes: 5 additions & 7 deletions solnlib/concurrent/concurrent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
# limitations under the License.
#

"""
Concurrent executor provides concurrent executing function either in
a thread pool or a process pool
"""
"""Concurrent executor provides concurrent executing function either in a
thread pool or a process pool."""

import solnlib.concurrent.process_pool as pp
import solnlib.concurrent.thread_pool as tp
Expand Down Expand Up @@ -71,9 +69,9 @@ def run_io_func_async(self, func, args=(), kwargs=None, callback=None):
return self._io_executor.apply_async(func, args, kwargs, callback)

def enqueue_io_funcs(self, funcs, block=True):
"""
run jobs in a fire and forget way, no result will be handled
over to clients
"""run jobs in a fire and forget way, no result will be handled over to
clients.
:param funcs: tuple/list-like or generator like object, func shall be
callable
"""
Expand Down
12 changes: 3 additions & 9 deletions solnlib/concurrent/process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@
# limitations under the License.
#

"""
A wrapper of multiprocessing.pool
"""
"""A wrapper of multiprocessing.pool."""

import multiprocessing

import logging


class ProcessPool:
"""
A simple wrapper of multiprocessing.pool
"""
"""A simple wrapper of multiprocessing.pool."""

def __init__(self, size=0, maxtasksperchild=10000):
if size <= 0:
Expand All @@ -38,9 +34,7 @@ def __init__(self, size=0, maxtasksperchild=10000):
self._stopped = False

def tear_down(self):
"""
Tear down the pool
"""
"""Tear down the pool."""

if self._stopped:
logging.info("ProcessPool has already stopped.")
Expand Down
63 changes: 22 additions & 41 deletions solnlib/concurrent/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
# limitations under the License.
#

"""
A simple thread pool implementation
"""
"""A simple thread pool implementation."""

import multiprocessing
import queue
Expand All @@ -26,11 +24,8 @@
import logging



class ThreadPool:
"""
A simple thread pool implementation
"""
"""A simple thread pool implementation."""

_high_watermark = 0.2
_resize_window = 10
Expand Down Expand Up @@ -63,9 +58,7 @@ def __init__(self, min_size=1, max_size=128, task_queue_size=1024, daemon=True):
self._started = False

def start(self):
"""
Start threads in the pool
"""
"""Start threads in the pool."""

with self._lock:
if self._started:
Expand All @@ -80,9 +73,7 @@ def start(self):
logging.info("ThreadPool started.")

def tear_down(self):
"""
Tear down thread pool
"""
"""Tear down thread pool."""

with self._lock:
if not self._started:
Expand All @@ -103,9 +94,9 @@ def tear_down(self):
logging.info("ThreadPool stopped.")

def enqueue_funcs(self, funcs, block=True):
"""
run jobs in a fire and forget way, no result will be handled
over to clients
"""run jobs in a fire and forget way, no result will be handled over to
clients.
:param funcs: tuple/list-like or generator like object, func shall be
callable
"""
Expand Down Expand Up @@ -153,9 +144,7 @@ def size(self):
return self._last_size

def resize(self, new_size):
"""
Resize the pool size, spawn or destroy threads if necessary
"""
"""Resize the pool size, spawn or destroy threads if necessary."""

if new_size <= 0:
return
Expand All @@ -182,9 +171,7 @@ def resize(self, new_size):
logging.info("Finished ThreadPool resizing. New size=%d", new_size)

def _remove_exited_threads_with_lock(self):
"""
Join the exited threads last time when resize was called
"""
"""Join the exited threads last time when resize was called."""

joined_thrs = set()
for thr in self._thrs:
Expand Down Expand Up @@ -252,9 +239,8 @@ def _do_admin(self):
)

def _run(self):
"""
Threads callback func, run forever to handle jobs from the job queue
"""
"""Threads callback func, run forever to handle jobs from the job
queue."""

work_queue = self._work_queue
count_lock = self._count_lock
Expand Down Expand Up @@ -282,9 +268,7 @@ def _run(self):
logging.debug("Done with exec job")
logging.info("Thread work_queue_size=%d", work_queue.qsize())

logging.debug(
"Worker thread %s stopped.", threading.current_thread().getName()
)
logging.debug("Worker thread %s stopped.", threading.current_thread().getName())


class AsyncResult:
Expand Down Expand Up @@ -315,11 +299,12 @@ def __call__(self):
self._callback()

def get(self, timeout=None):
"""
Return the result when it arrives. If timeout is not None and the
result does not arrive within timeout seconds then
multiprocessing.TimeoutError is raised. If the remote call raised an
exception then that exception will be reraised by get().
"""Return the result when it arrives.
If timeout is not None and the result does not arrive within
timeout seconds then multiprocessing.TimeoutError is raised. If
the remote call raised an exception then that exception will be
reraised by get().
"""

try:
Expand All @@ -332,9 +317,7 @@ def get(self, timeout=None):
return res

def wait(self, timeout=None):
"""
Wait until the result is available or until timeout seconds pass.
"""
"""Wait until the result is available or until timeout seconds pass."""

try:
res = self._q.get(timeout=timeout)
Expand All @@ -344,15 +327,13 @@ def wait(self, timeout=None):
self._q.put(res)

def ready(self):
"""
Return whether the call has completed.
"""
"""Return whether the call has completed."""

return len(self._q)

def successful(self):
"""
Return whether the call completed without raising an exception.
"""Return whether the call completed without raising an exception.
Will raise AssertionError if the result is not ready.
"""

Expand Down
7 changes: 2 additions & 5 deletions solnlib/modular_input/modinput.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@


def _parse_modinput_configs(root, outer_block, inner_block):
"""
When user splunkd spawns modinput script to do config check or run
"""When user splunkd spawns modinput script to do config check or run.
<?xml version="1.0" encoding="UTF-8"?>
<input>
Expand Down Expand Up @@ -148,9 +147,7 @@ def get_modinput_configs_from_cli(modinput, modinput_stanza=None):


def get_modinput_config_str_from_stdin():
"""
Get modinput from stdin which is feed by splunkd
"""
"""Get modinput from stdin which is feed by splunkd."""

try:
return sys.stdin.read(5000)
Expand Down
8 changes: 4 additions & 4 deletions solnlib/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def splunkd_request(
) -> Optional[requests.Response]:

headers = headers if headers is not None else {}
headers["Authorization"] = "Splunk {}".format(session_key)
headers["Authorization"] = f"Splunk {session_key}"
content_type = headers.get("Content-Type")
if not content_type:
content_type = headers.get("content-type")
Expand Down Expand Up @@ -79,13 +79,13 @@ def splunkd_request(

def code_to_msg(response: requests.Response):
code_msg_tbl = {
400: "Request error. reason={}".format(response.text),
400: f"Request error. reason={response.text}",
401: "Authentication failure, invalid access credentials.",
402: "In-use license disables this feature.",
403: "Insufficient permission.",
404: "Requested endpoint does not exist.",
409: "Invalid operation for this endpoint. reason={}".format(response.text),
500: "Unspecified internal server error. reason={}".format(response.text),
409: f"Invalid operation for this endpoint. reason={response.text}",
500: f"Unspecified internal server error. reason={response.text}",
503: (
"Feature is disabled in the configuration file. "
"reason={}".format(response.text)
Expand Down
4 changes: 1 addition & 3 deletions solnlib/schedule/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@


class Job:
"""
Timer wraps the callback and timestamp related stuff
"""
"""Timer wraps the callback and timestamp related stuff."""

_ident = 0
_lock = threading.Lock()
Expand Down
17 changes: 7 additions & 10 deletions solnlib/schedule/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@


class Scheduler:
"""
A simple scheduler which schedules the periodic or once event
"""
"""A simple scheduler which schedules the periodic or once event."""

import sortedcontainers as sc

Expand All @@ -42,9 +40,10 @@ def __init__(self):
self._started = False

def start(self):
"""
Start the schduler which will start the internal thread for scheduling
jobs. Please do tear_down when doing cleanup
"""Start the schduler which will start the internal thread for
scheduling jobs.
Please do tear_down when doing cleanup
"""

if self._started:
Expand All @@ -55,10 +54,8 @@ def start(self):
self._thr.start()

def tear_down(self):
"""
Stop the schduler which will stop the internal thread for scheduling
jobs.
"""
"""Stop the schduler which will stop the internal thread for scheduling
jobs."""

if not self._started:
logging.info("Scheduler already tear down.")
Expand Down

0 comments on commit 3c37c9c

Please sign in to comment.