Skip to content

Commit

Permalink
Merge branch 'develop' into feat/alerts_client
Browse files Browse the repository at this point in the history
# Conflicts:
#	poetry.lock
  • Loading branch information
kkedziak-splunk committed Dec 4, 2024
2 parents 53613e6 + a0d0e03 commit 294ca3f
Show file tree
Hide file tree
Showing 11 changed files with 1,094 additions and 4 deletions.
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

[tool.poetry]
name = "solnlib"
version = "5.5.0-beta.1"
version = "6.1.0-beta.1"
description = "The Splunk Software Development Kit for Splunk Solutions"
authors = ["Splunk <[email protected]>"]
license = "Apache-2.0"
Expand All @@ -42,7 +42,7 @@ classifiers = [
python = ">=3.7,<3.14"
sortedcontainers = ">=2"
defusedxml = ">=0.7"
splunk-sdk = ">=1.6"
splunk-sdk = ">=2.0.2"

[tool.poetry.group.dev.dependencies]
pytest = ">=7"
Expand Down
2 changes: 1 addition & 1 deletion solnlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@
"utils",
]

__version__ = "5.5.0-beta.1"
__version__ = "6.1.0-beta.1"
102 changes: 102 additions & 0 deletions solnlib/concurrent/concurrent_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright 2024 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Concurrent executor provides concurrent executing function either in a
thread pool or a process pool."""

import solnlib.concurrent.process_pool as pp
import solnlib.concurrent.thread_pool as tp


class ConcurrentExecutor:
def __init__(self, config):
"""
:param config: dict like object, contains thread_min_size (int),
thread_max_size (int), daemonize_thread (bool),
process_size (int)
"""

self._io_executor = tp.ThreadPool(
config.get("thread_min_size", 0),
config.get("thread_max_size", 0),
config.get("task_queue_size", 1024),
config.get("daemonize_thread", True),
)
self._compute_executor = None
if config.get("process_size", 0):
self._compute_executor = pp.ProcessPool(config.get("process_size", 0))

def start(self):
self._io_executor.start()

def tear_down(self):
self._io_executor.tear_down()
if self._compute_executor is not None:
self._compute_executor.tear_down()

def run_io_func_sync(self, func, args=(), kwargs=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""

return self._io_executor.apply(func, args, kwargs)

def run_io_func_async(self, func, args=(), kwargs=None, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:calllback: when func is done and without exception, call the callback
:return whatever the func returns
"""

return self._io_executor.apply_async(func, args, kwargs, callback)

def enqueue_io_funcs(self, funcs, block=True):
"""run jobs in a fire and forget way, no result will be handled over to
clients.
:param funcs: tuple/list-like or generator like object, func shall be
callable
"""

return self._io_executor.enqueue_funcs(funcs, block)

def run_compute_func_sync(self, func, args=(), kwargs={}):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""

assert self._compute_executor is not None
return self._compute_executor.apply(func, args, kwargs)

def run_compute_func_async(self, func, args=(), kwargs={}, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:calllback: when func is done and without exception, call the callback
:return whatever the func returns
"""

assert self._compute_executor is not None
return self._compute_executor.apply_async(func, args, kwargs, callback)
75 changes: 75 additions & 0 deletions solnlib/concurrent/process_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright 2024 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A wrapper of multiprocessing.pool."""

import multiprocessing

import logging


class ProcessPool:
"""A simple wrapper of multiprocessing.pool."""

def __init__(self, size=0, maxtasksperchild=10000):
if size <= 0:
size = multiprocessing.cpu_count()
self.size = size
self._pool = multiprocessing.Pool(
processes=size, maxtasksperchild=maxtasksperchild
)
self._stopped = False

def tear_down(self):
"""Tear down the pool."""

if self._stopped:
logging.info("ProcessPool has already stopped.")
return
self._stopped = True

self._pool.close()
self._pool.join()
logging.info("ProcessPool stopped.")

def apply(self, func, args=(), kwargs={}):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:return whatever the func returns
"""

if self._stopped:
logging.info("ProcessPool has already stopped.")
return None

return self._pool.apply(func, args, kwargs)

def apply_async(self, func, args=(), kwargs={}, callback=None):
"""
:param func: callable
:param args: free params
:param kwargs: named params
:callback: when func is done without exception, call this callack
:return whatever the func returns
"""

if self._stopped:
logging.info("ProcessPool has already stopped.")
return None

return self._pool.apply_async(func, args, kwargs, callback)
Loading

0 comments on commit 294ca3f

Please sign in to comment.