Skip to content

Commit

Permalink
feat: moved methods from splunktalib (#415)
Browse files Browse the repository at this point in the history
[ADDON-73693](https://splunk.atlassian.net/browse/ADDON-73693)

Migrated all the methods of `splunktalib` which were used in
`splunktaucclib` to `solnlib`
  • Loading branch information
hetangmodi-crest authored and artemrys committed Dec 2, 2024
1 parent d46cd21 commit 4953486
Show file tree
Hide file tree
Showing 8 changed files with 1,090 additions and 0 deletions.
102 changes: 102 additions & 0 deletions solnlib/concurrent/concurrent_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright 2024 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Concurrent executor provides concurrent executing function either in a
thread pool or a process pool."""

import solnlib.concurrent.process_pool as pp
import solnlib.concurrent.thread_pool as tp


class ConcurrentExecutor:
def __init__(self, config):
"""
:param config: dict like object, contains thread_min_size (int),
thread_max_size (int), daemonize_thread (bool),
process_size (int)
"""

self._io_executor = tp.ThreadPool(
config.get("thread_min_size", 0),
config.get("thread_max_size", 0),
config.get("task_queue_size", 1024),
config.get("daemonize_thread", True),
)
self._compute_executor = None
if config.get("process_size", 0):
self._compute_executor = pp.ProcessPool(config.get("process_size", 0))

def start(self):
self._io_executor.start()

def tear_down(self):
self._io_executor.tear_down()
if self._compute_executor is not None:
self._compute_executor.tear_down()

def run_io_func_sync(self, func, args=(), kwargs=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""

return self._io_executor.apply(func, args, kwargs)

def run_io_func_async(self, func, args=(), kwargs=None, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:calllback: when func is done and without exception, call the callback
:return whatever the func returns
"""

return self._io_executor.apply_async(func, args, kwargs, callback)

def enqueue_io_funcs(self, funcs, block=True):
"""run jobs in a fire and forget way, no result will be handled over to
clients.
:param funcs: tuple/list-like or generator like object, func shall be
callable
"""

return self._io_executor.enqueue_funcs(funcs, block)

def run_compute_func_sync(self, func, args=(), kwargs={}):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""

assert self._compute_executor is not None
return self._compute_executor.apply(func, args, kwargs)

def run_compute_func_async(self, func, args=(), kwargs={}, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:calllback: when func is done and without exception, call the callback
:return whatever the func returns
"""

assert self._compute_executor is not None
return self._compute_executor.apply_async(func, args, kwargs, callback)
75 changes: 75 additions & 0 deletions solnlib/concurrent/process_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright 2024 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A wrapper of multiprocessing.pool."""

import multiprocessing

import logging


class ProcessPool:
"""A simple wrapper of multiprocessing.pool."""

def __init__(self, size=0, maxtasksperchild=10000):
if size <= 0:
size = multiprocessing.cpu_count()
self.size = size
self._pool = multiprocessing.Pool(
processes=size, maxtasksperchild=maxtasksperchild
)
self._stopped = False

def tear_down(self):
"""Tear down the pool."""

if self._stopped:
logging.info("ProcessPool has already stopped.")
return
self._stopped = True

self._pool.close()
self._pool.join()
logging.info("ProcessPool stopped.")

def apply(self, func, args=(), kwargs={}):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""

if self._stopped:
logging.info("ProcessPool has already stopped.")
return None

return self._pool.apply(func, args, kwargs)

def apply_async(self, func, args=(), kwargs={}, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:callback: when func is done without exception, call this callack
:return whatever the func returns
"""

if self._stopped:
logging.info("ProcessPool has already stopped.")
return None

return self._pool.apply_async(func, args, kwargs, callback)
Loading

0 comments on commit 4953486

Please sign in to comment.