Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: moved methods from splunktalib #415

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ci: fix pre-commit failure
hetangmodi-crest committed Dec 2, 2024
commit 3c37c9c6226e83c995db99497d3703c236f4d694
12 changes: 5 additions & 7 deletions solnlib/concurrent/concurrent_executor.py
Original file line number Diff line number Diff line change
@@ -14,10 +14,8 @@
# limitations under the License.
#

"""
Concurrent executor provides concurrent executing function either in
a thread pool or a process pool
"""
"""Concurrent executor provides concurrent executing function either in a
thread pool or a process pool."""

import solnlib.concurrent.process_pool as pp
import solnlib.concurrent.thread_pool as tp
@@ -71,9 +69,9 @@ def run_io_func_async(self, func, args=(), kwargs=None, callback=None):
return self._io_executor.apply_async(func, args, kwargs, callback)

def enqueue_io_funcs(self, funcs, block=True):
"""
run jobs in a fire and forget way, no result will be handled
over to clients
"""run jobs in a fire and forget way, no result will be handled over to
clients.

:param funcs: tuple/list-like or generator like object, func shall be
callable
"""
12 changes: 3 additions & 9 deletions solnlib/concurrent/process_pool.py
Original file line number Diff line number Diff line change
@@ -14,19 +14,15 @@
# limitations under the License.
#

"""
A wrapper of multiprocessing.pool
"""
"""A wrapper of multiprocessing.pool."""

import multiprocessing

import logging


class ProcessPool:
"""
A simple wrapper of multiprocessing.pool
"""
"""A simple wrapper of multiprocessing.pool."""

def __init__(self, size=0, maxtasksperchild=10000):
if size <= 0:
@@ -38,9 +34,7 @@ def __init__(self, size=0, maxtasksperchild=10000):
self._stopped = False

def tear_down(self):
"""
Tear down the pool
"""
"""Tear down the pool."""

if self._stopped:
logging.info("ProcessPool has already stopped.")
63 changes: 22 additions & 41 deletions solnlib/concurrent/thread_pool.py
Original file line number Diff line number Diff line change
@@ -14,9 +14,7 @@
# limitations under the License.
#

"""
A simple thread pool implementation
"""
"""A simple thread pool implementation."""

import multiprocessing
import queue
@@ -26,11 +24,8 @@
import logging



class ThreadPool:
"""
A simple thread pool implementation
"""
"""A simple thread pool implementation."""

_high_watermark = 0.2
_resize_window = 10
@@ -63,9 +58,7 @@ def __init__(self, min_size=1, max_size=128, task_queue_size=1024, daemon=True):
self._started = False

def start(self):
"""
Start threads in the pool
"""
"""Start threads in the pool."""

with self._lock:
if self._started:
@@ -80,9 +73,7 @@ def start(self):
logging.info("ThreadPool started.")

def tear_down(self):
"""
Tear down thread pool
"""
"""Tear down thread pool."""

with self._lock:
if not self._started:
@@ -103,9 +94,9 @@ def tear_down(self):
logging.info("ThreadPool stopped.")

def enqueue_funcs(self, funcs, block=True):
"""
run jobs in a fire and forget way, no result will be handled
over to clients
"""run jobs in a fire and forget way, no result will be handled over to
clients.

:param funcs: tuple/list-like or generator like object, func shall be
callable
"""
@@ -153,9 +144,7 @@ def size(self):
return self._last_size

def resize(self, new_size):
"""
Resize the pool size, spawn or destroy threads if necessary
"""
"""Resize the pool size, spawn or destroy threads if necessary."""

if new_size <= 0:
return
@@ -182,9 +171,7 @@ def resize(self, new_size):
logging.info("Finished ThreadPool resizing. New size=%d", new_size)

def _remove_exited_threads_with_lock(self):
"""
Join the exited threads last time when resize was called
"""
"""Join the exited threads last time when resize was called."""

joined_thrs = set()
for thr in self._thrs:
@@ -252,9 +239,8 @@ def _do_admin(self):
)

def _run(self):
"""
Threads callback func, run forever to handle jobs from the job queue
"""
"""Threads callback func, run forever to handle jobs from the job
queue."""

work_queue = self._work_queue
count_lock = self._count_lock
@@ -282,9 +268,7 @@ def _run(self):
logging.debug("Done with exec job")
logging.info("Thread work_queue_size=%d", work_queue.qsize())

logging.debug(
"Worker thread %s stopped.", threading.current_thread().getName()
)
logging.debug("Worker thread %s stopped.", threading.current_thread().getName())


class AsyncResult:
@@ -315,11 +299,12 @@ def __call__(self):
self._callback()

def get(self, timeout=None):
"""
Return the result when it arrives. If timeout is not None and the
result does not arrive within timeout seconds then
multiprocessing.TimeoutError is raised. If the remote call raised an
exception then that exception will be reraised by get().
"""Return the result when it arrives.

If timeout is not None and the result does not arrive within
timeout seconds then multiprocessing.TimeoutError is raised. If
the remote call raised an exception then that exception will be
reraised by get().
"""

try:
@@ -332,9 +317,7 @@ def get(self, timeout=None):
return res

def wait(self, timeout=None):
"""
Wait until the result is available or until timeout seconds pass.
"""
"""Wait until the result is available or until timeout seconds pass."""

try:
res = self._q.get(timeout=timeout)
@@ -344,15 +327,13 @@ def wait(self, timeout=None):
self._q.put(res)

def ready(self):
"""
Return whether the call has completed.
"""
"""Return whether the call has completed."""

return len(self._q)

def successful(self):
"""
Return whether the call completed without raising an exception.
"""Return whether the call completed without raising an exception.

Will raise AssertionError if the result is not ready.
"""

7 changes: 2 additions & 5 deletions solnlib/modular_input/modinput.py
Original file line number Diff line number Diff line change
@@ -23,8 +23,7 @@


def _parse_modinput_configs(root, outer_block, inner_block):
"""
When user splunkd spawns modinput script to do config check or run
"""When user splunkd spawns modinput script to do config check or run.

<?xml version="1.0" encoding="UTF-8"?>
<input>
@@ -148,9 +147,7 @@ def get_modinput_configs_from_cli(modinput, modinput_stanza=None):


def get_modinput_config_str_from_stdin():
"""
Get modinput from stdin which is feed by splunkd
"""
"""Get modinput from stdin which is feed by splunkd."""

try:
return sys.stdin.read(5000)
8 changes: 4 additions & 4 deletions solnlib/rest.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ def splunkd_request(
) -> Optional[requests.Response]:

headers = headers if headers is not None else {}
headers["Authorization"] = "Splunk {}".format(session_key)
headers["Authorization"] = f"Splunk {session_key}"
content_type = headers.get("Content-Type")
if not content_type:
content_type = headers.get("content-type")
@@ -79,13 +79,13 @@ def splunkd_request(

def code_to_msg(response: requests.Response):
code_msg_tbl = {
400: "Request error. reason={}".format(response.text),
400: f"Request error. reason={response.text}",
401: "Authentication failure, invalid access credentials.",
402: "In-use license disables this feature.",
403: "Insufficient permission.",
404: "Requested endpoint does not exist.",
409: "Invalid operation for this endpoint. reason={}".format(response.text),
500: "Unspecified internal server error. reason={}".format(response.text),
409: f"Invalid operation for this endpoint. reason={response.text}",
500: f"Unspecified internal server error. reason={response.text}",
503: (
"Feature is disabled in the configuration file. "
"reason={}".format(response.text)
4 changes: 1 addition & 3 deletions solnlib/schedule/job.py
Original file line number Diff line number Diff line change
@@ -19,9 +19,7 @@


class Job:
"""
Timer wraps the callback and timestamp related stuff
"""
"""Timer wraps the callback and timestamp related stuff."""

_ident = 0
_lock = threading.Lock()
17 changes: 7 additions & 10 deletions solnlib/schedule/scheduler.py
Original file line number Diff line number Diff line change
@@ -23,9 +23,7 @@


class Scheduler:
"""
A simple scheduler which schedules the periodic or once event
"""
"""A simple scheduler which schedules the periodic or once event."""

import sortedcontainers as sc

@@ -42,9 +40,10 @@ def __init__(self):
self._started = False

def start(self):
"""
Start the schduler which will start the internal thread for scheduling
jobs. Please do tear_down when doing cleanup
"""Start the schduler which will start the internal thread for
scheduling jobs.

Please do tear_down when doing cleanup
"""

if self._started:
@@ -55,10 +54,8 @@ def start(self):
self._thr.start()

def tear_down(self):
"""
Stop the schduler which will stop the internal thread for scheduling
jobs.
"""
"""Stop the schduler which will stop the internal thread for scheduling
jobs."""

if not self._started:
logging.info("Scheduler already tear down.")