Skip to content

Commit

Permalink
Release v0.1.0 (#12)
Browse files Browse the repository at this point in the history
* Use ordered data structure to keep track of queue processes (#7)

* Fix potential sync bugs with processes being joined out of order

* A dictionary is unordered, this could cause issues when joining processes since this operation needs to be done in order

* Using a tuple is the best option here, since it is immutable and the structure is simple enough

* push rolling releases with rc branch, not develop

* Create 0.0.2rc (#8)

* Use ordered data structure to keep track of queue processes (#7)

* Fix potential sync bugs with processes being joined out of order

* A dictionary is unordered, this could cause issues when joining processes since this operation needs to be done in order

* Using a tuple is the best option here, since it is immutable and the structure is simple enough

* push rolling releases with rc branch, not develop

* bump package version to 0.0.2rc

* add mypy configuration file

* I decided to start using mypy to typecheck the library

* supress flake8 errors in init

* improve inner library organization

* Move the contents of main.py to automator.py so the naming was clearer

* Also move the example outside the src folder of the library as it is not library code

* Add support to insert data to a queue other than the initial one

* Fix bug in the validate_non_empty_args helper

* add a reset() api to clear the QueueAutomator state

* Create MultiprocessMaybe

* This multiprocessing helper uses the Maybe pattern as inspiration to chain several function calls using multiprocessing

* It wraps a QueueAutomator instance and provides a different style API which could be cleaner than having a bunch of decorators and manually defining queue names.

* This is a work in progress, and it is improving with every iteration

* Add example for MultiprocessingMaybe

* Showcase the MultiprocessingMaybe capabilities in an example

-> insert to intermediate queues
-> replace None or values matching a predicate with the default value
-> Prevent crashes for None values not executing the function when the input matches a predicate

* Add tests for QueueAutomator

* Include the first round of tests for the queue automator, this will ensure that the main functionality does not break if changed

* Update mypy configurations

* This is to exclude build and dist directories from typechecking

* Add docstrings to MultiprocessMaybe

* Update rc version

* Update Readme

* Include Multiprocessing maybe examples

* Update package version to publish
  • Loading branch information
Wason1797 authored Oct 23, 2022
1 parent 9dd623a commit 62b242c
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 36 deletions.
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,46 @@ if __name__ == '__main__':

```

### Using the MultiprocessingMaybe interface

```python
from queue_automator.maybe import MultiprocessMaybe


def do_work(item: int) -> int:
sleep(2)
result = item*2
print(f'{item} times two {result}')
return result


def do_work_2(item: int) -> int:
sleep(2)
result = item**2
print(f'{item} squared {result}')
return result


def do_work_3(item: int) -> int:
sleep(2)
result = item**3
print(f'{item} cubed {result}')
return result


if __name__ == '__main__':
result = MultiprocessMaybe() \
.insert(range(10)) \
.then(do_work) \
.insert(range(10, 20)) \
.then(do_work_2) \
.insert(range(20, 30)) \
.maybe(do_work_3, default=0)

print(result)

```

## Cautions

As with anything, this is not a silver bullet that gets rid of all problems using python multiprocessing
Expand Down
16 changes: 10 additions & 6 deletions src/queue_automator/example.py → examples/automator.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
from queue_automator import QueueAutomator
from time import perf_counter, sleep

from queue_automator import QueueAutomator

import logging

logging.basicConfig(level=logging.DEBUG)

automator = QueueAutomator()


@automator.register_as_worker_function(output_queue_name='square_queue', process_count=2)
def do_work(item):
def do_work(item: int) -> int:
sleep(2)
result = item*2
print(f'{item} times two {result}')
return result


@automator.register_as_worker_function(input_queue_name='square_queue', output_queue_name='cube_queue', process_count=2)
def do_work_2(item):
def do_work_2(item: int) -> int:
sleep(2)
result = item**2
print(f'{item} squared {result}')
return result


@automator.register_as_worker_function(input_queue_name='cube_queue', output_queue_name='add_2_queue', process_count=2)
def do_work_3(item):
def do_work_3(item: int) -> int:
sleep(2)
result = item**3
print(f'{item} cubed {result}')
return result


@automator.register_as_worker_function(input_queue_name='add_2_queue', process_count=2)
def do_work_4(item):
def do_work_4(item: int) -> int:
sleep(2)
result = item+2
print(f'{item} + 2 {result}')
Expand All @@ -42,7 +46,7 @@ def do_work_4(item):
input_data = range(30)
automator.set_input_data(input_data)
results = automator.run()
print(results)
end = perf_counter()

print(results)
print(f'Took {end-start:0.2f}s')
44 changes: 44 additions & 0 deletions examples/maybe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from time import perf_counter, sleep

from queue_automator.maybe import MultiprocessMaybe

import logging

logging.basicConfig(level=logging.DEBUG)


def do_work(item: int) -> int:
sleep(2)
result = item*2
print(f'{item} times two {result}')
return result


def do_work_2(item: int) -> int:
sleep(2)
result = item**2
print(f'{item} squared {result}')
return result


def do_work_3(item: int) -> int:
sleep(2)
result = item**3
print(f'{item} cubed {result}')
return result


if __name__ == '__main__':
start = perf_counter()
result = MultiprocessMaybe() \
.insert(range(10)) \
.then(do_work) \
.insert(range(10, 20)) \
.then(do_work_2) \
.insert(range(20, 30)) \
.maybe(do_work_3, default=0)

end = perf_counter()

print(result)
print(f'Took {end-start:0.2f}s')
10 changes: 10 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[mypy]
exclude = ['.venv', 'build', 'dist']
disallow_untyped_defs = True
disallow_untyped_calls = True
disallow_untyped_decorators = False
no_implicit_optional = True
warn_redundant_casts = True
warn_unreachable = True
ignore_missing_imports = True
ignore_missing_imports_per_module = True
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="queue-automator",
version="0.0.2",
version="0.1.0",
author="Wason1797",
author_email="[email protected]",
description="A simple wrapper to build queue multiprocessing pipelines",
Expand Down
3 changes: 2 additions & 1 deletion src/queue_automator/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .main import QueueAutomator
# flake8: noqa
from .automator import QueueAutomator
88 changes: 60 additions & 28 deletions src/queue_automator/main.py → src/queue_automator/automator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from multiprocessing import JoinableQueue, Manager, Process, Queue
from multiprocessing.managers import SyncManager
from typing import Callable, Iterable, List, Union
from typing import Callable, Dict, Iterable, List, Sequence, Union

from .constants import QueueFlags, QueueNames

Expand Down Expand Up @@ -31,35 +31,36 @@ class QueueAutomator:
"""

def __init__(self, name: Union[str, None] = None) -> 'QueueAutomator':
self.__queue_table: dict = {
def __init__(self, name: Union[str, None] = None) -> None:
self.__queue_table: Dict[str, dict] = {
QueueNames.OUTPUT: {
'target': None,
'process_count': None,
'worker_function': None
'worker_function': None,
'data': None
},
}
self.input_data = None
self.name = name or ''

def __repr__(self) -> str:
return f'QueueAutomator[{self.name}]'

def __validate_non_empty_args(self, args: list):
def __validate_non_empty_args(self, args: Sequence) -> None:
for arg in args:
if not args:
if not arg:
raise ValueError(f'{arg} should not be empty or zero')

def __build_queue(self, name: str, target: str, process_count: int, worker_function: Callable) -> dict:
return {
name: {
'target': target,
'process_count': process_count,
'worker_function': worker_function
'worker_function': worker_function,
'data': None
}
}

def __generate_queues(self, queues: list, manager: SyncManager, name: str):
def __generate_queues(self, queues: list, manager: SyncManager, name: str) -> None:
if name == QueueNames.OUTPUT:
self.__queue_table[name]['queue'] = manager.Queue(0)
return
Expand All @@ -73,23 +74,30 @@ def __generate_queues(self, queues: list, manager: SyncManager, name: str):
raise RuntimeError(f'{name} was already created, you may be creating a circular pipeline')

next_queue = current_queue['target']
current_queue['queue'] = manager.JoinableQueue()
current_queue['queue'] = manager.JoinableQueue() # type: ignore
queues.append((name, next_queue))

return self.__generate_queues(queues, manager, next_queue)

def __enqueue_input_data(self):
input_queue = self.__queue_table[QueueNames.INPUT].get('queue')
if not input_queue:
RuntimeError('enqueue_items was called before input queue was initialized, this should not happen')
def __enqueue_data(self) -> None:

if not self.input_data:
RuntimeError('input_data is empty, no data to process')
for queue_name, queue_data in self.__queue_table.items():

for item in self.input_data:
input_queue.put(item)
queue = queue_data.get('queue')
if not queue:
RuntimeError('enqueue_data was called for a non existent queue, this should not happen')

def _process_enqueued_objects(self, in_queue: JoinableQueue, out_queue: Queue, worker_function: Callable):
data = queue_data.get('data', [])
if not data:
if queue_name == QueueNames.INPUT:
RuntimeError('data for input queue is empty, nothing to process')
continue

logger.debug(f'Inserting {len(data)} items in queue {queue_name}')
for item in data:
queue.put(item) # type: ignore

def _process_enqueued_objects(self, in_queue: JoinableQueue, out_queue: Queue, worker_function: Callable) -> None:

while True:
input_object = in_queue.get()
Expand All @@ -116,32 +124,46 @@ def __spawn_processes(self, in_queue_name: str, out_queue_name: str) -> List[Pro

return process_list

def __join_processes(self, process_list: list):
def __join_processes(self, process_list: list) -> None:
for process in process_list:
process.join()

def __signal_queue_exit(self, queue: JoinableQueue, num_processes: int):
def __signal_queue_exit(self, queue: JoinableQueue, num_processes: int) -> None:
for _ in range(num_processes):
queue.put(QueueFlags.EXIT)

def __recover_from_queue(self, queue: Queue, manager=False) -> list:
def __recover_from_queue(self, queue: Queue, manager: bool = False) -> list:
results = []
while not queue.empty():
results.append(queue.get())
if manager:
queue.task_done()
return results

def set_input_data(self, input_data: Iterable):
def set_data_for_queue(self, data: Iterable, queue: str) -> None:

logger.debug(f'Setting data for queue {queue}')

if queue == QueueNames.OUTPUT:
raise RuntimeError('trying to set data directly to the output queue')

if queue not in self.__queue_table:
raise RuntimeError('trying to set data to an unexistent queue')

self.__queue_table[queue]['data'] = data

def set_input_data(self, input_data: Iterable) -> None:
"""
This function is used to set the data to be processed in the pipeline
This function is used to set the data to be processed at the start of the pipeline
Args:
input_data (Iterable)
"""
self.input_data = input_data
self.set_data_for_queue(input_data, QueueNames.INPUT)

def register_as_worker_function(self, input_queue_name: str = QueueNames.INPUT, output_queue_name: str = QueueNames.OUTPUT, process_count: int = 1) -> Callable:
def register_as_worker_function(self, input_queue_name: str = QueueNames.INPUT,
output_queue_name: str = QueueNames.OUTPUT,
process_count: int = 1) -> Callable:
"""
Decorator to register your functions to process data as part of a multiprocessing queue pipeline
Expand Down Expand Up @@ -188,13 +210,13 @@ def run(self) -> list:
"""

manager = Manager()
queues = []
queues: List[tuple] = []

self.__generate_queues(queues, manager, QueueNames.INPUT)

process_per_queue = tuple((input_queue, self.__spawn_processes(input_queue, output_queue)) for input_queue, output_queue in queues)

self.__enqueue_input_data()
self.__enqueue_data()

for queue_name, procesess in process_per_queue:
current_queue = self.__queue_table[queue_name]
Expand All @@ -203,3 +225,13 @@ def run(self) -> list:
self.__join_processes(procesess)

return self.__recover_from_queue(self.__queue_table[QueueNames.OUTPUT]['queue'], True)

def reset(self) -> None:
self.__queue_table = {
QueueNames.OUTPUT: {
'target': None,
'process_count': None,
'worker_function': None,
'data': None
}
}
Loading

0 comments on commit 62b242c

Please sign in to comment.