Skip to content

Commit

Permalink
Merge pull request #198 from vivarium-collective/process-commands
Browse files Browse the repository at this point in the history
Add Process Commands
  • Loading branch information
U8NWXD authored Jun 15, 2022
2 parents c602206 + c60b7a3 commit 65a3257
Show file tree
Hide file tree
Showing 14 changed files with 644 additions and 161 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ disable=print-statement,
too-many-locals,
too-many-branches,
too-few-public-methods,
too-many-public-methods,
too-many-lines,
no-self-use,
fixme,
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## v1.3.0

* (#198) Introduce process commands to support more interactions with
parallel processes. Now all `Process` methods of a parallelized
process can be queried from the parent OS process. Users can also add
support for custom methods of their processes.

This change also simplifies the way `Engine` handles parallel
processes warns users when serializers are not being found
efficiently.

## v1.2.8

* (#186) Apply function to data from database emitter in `get_history_data_db`.
Expand Down
37 changes: 37 additions & 0 deletions doc/guides/processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,40 @@ The above pseudocode is simplified, and for all but the most simple
processes you will be better off using Vivarium's built-in simulation
capabilities. We hope though that this helps you understand how
processes are simulated and the purpose of the API we defined.

-------------------
Parallel Processing
-------------------

Process Commands
================

When a :term:`process` is run in parallel, we can't interact with it in
the normal Python way. Instead, we can only exchange messages with it
through a pipe. Vivarium structures these exchanges using :term:`process
commands`.

Vivarium provides some built-in commands, which are documented in
:py:meth:`vivarium.core.process.Process.send_command`. Also see that
method's documentation for instructions on how to add support for your
own commands.

Process commands are designed to be used asynchronously, so to retrieve
the result of running a command, you need to call
:py:meth:`vivarium.core.process.Process.get_command_result`. As a
convenience, you can also call
:py:meth:`vivarium.core.process.Process.run_command` to send a command
and get its result as a return value in one function call.

Running Processes in Parallel
=============================

In normal situations though, you shouldn't have to worry about process
commands. Instead, just pass ``'_parallel': True`` in a process's
configuration dictionary, and the Vivarium Engine will handle the
parallelization for you. Just remember that parallelization requires
that processes be serialized and deserialized at the start of the
simulation, and this serialization only preserves the process
parameters. This means that if you instantiate a process and then change
its instance variables, those changes won't be preserved when the
process gets parallelized.
1 change: 1 addition & 0 deletions doc/reference/api/vivarium.core.process.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.. automodule:: vivarium.core.process
:members:
:undoc-members:
:private-members: _handle_parallel_process
:show-inheritance:
10 changes: 10 additions & 0 deletions doc/reference/glossary.rst
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,16 @@ Glossary
subclass either :py:class:`vivarium.core.process.Process`
or another process class.

Process Command
Process command
process command
Process Commands
Process commands
process commands
Instructions that let Vivarium communicate with parallel
processes in a remote-procedure-call-like fashion. See :doc:`the
processes guide </guides/processes>` for details.

Raw Data
Raw data
raw data
Expand Down
2 changes: 2 additions & 0 deletions doc/vale/styles/Vocab/All/vocab.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ Agmon
Spangler
Skalnik
Bioinformatics
parallelization
deserialized
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup


VERSION = '1.2.8'
VERSION = '1.3.0'


if __name__ == '__main__':
Expand Down
164 changes: 69 additions & 95 deletions vivarium/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
inverse_topology,
normalize_path,
)
from vivarium.library.dict_utils import apply_func_to_leaves
from vivarium.core.types import (
HierarchyPath, Topology, State, Update, Processes, Steps,
Flow, Schema)
Expand Down Expand Up @@ -119,21 +120,6 @@ def timestamp(dt: Optional[Any] = None) -> str:
dt.hour, dt.minute, dt.second)


def invoke_process(
process: Process,
interval: float,
states: State,
) -> Update:
"""Compute a process's next update.
Call the process's
:py:meth:`vivarium.core.process.Process.next_update` function with
``interval`` and ``states``.
"""

return process.next_update(interval, states)


def empty_front(t: float) -> Dict[str, Union[float, dict]]:
return {
'time': t,
Expand All @@ -154,10 +140,12 @@ def __init__(
called.
Args:
defer: An object with a ``.get()`` method whose output will
be passed to the function. For example, the object could
be an :py:class:`InvokeProcess` object whose ``.get()``
method will return the process update.
defer: An object with a ``.get_command_result()`` method
whose output will be passed to the function. For
example, the object could be an
:py:class:`vivarium.core.process.Process` object whose
``.get_command_result()`` method will return the process
update.
function: The function. For example,
:py:func:`invert_topology` to transform the returned
update.
Expand All @@ -174,7 +162,7 @@ def get(self) -> Update:
The result of calling the function.
"""
return self.f(
self.defer.get(),
self.defer.get_command_result(),
self.args)


Expand All @@ -188,44 +176,6 @@ def get(self) -> Update:
return {}


class InvokeProcess:
def __init__(
self,
process: Process,
interval: float,
states: State,
) -> None:
"""A wrapper object that computes an update.
This class holds the update of a process that is not running in
parallel. When instantiated, it immediately computes the
process's next update.
Args:
process: The process that will calculate the update.
interval: The timestep for the update.
states: The simulation state to pass to the process's
``next_update`` function.
"""
self.process = process
self.interval = interval
self.states = states
self.update = invoke_process(
self.process,
self.interval,
self.states)

def get(self) -> Update:
"""Return the computed update.
This method is analogous to the ``.get()`` method in
:py:class:`vivarium.core.process.ParallelProcess` so that
parallel and non-parallel updates can be intermixed in the
simulation engine.
"""
return self.update


class _StepGraph:
"""A dependency graph of :term:`steps`.
Expand Down Expand Up @@ -374,7 +324,6 @@ def __init__(
emit_topology: bool = True,
emit_processes: bool = False,
emit_config: bool = False,
invoke: Optional[Any] = None,
emit_step: float = 1,
display_info: bool = True,
progress_bar: bool = False,
Expand Down Expand Up @@ -409,7 +358,15 @@ def __init__(
process for that port.
store: A pre-loaded Store. This is an alternative to passing
in processes and topology dict, which can not be loaded
at the same time.
at the same time. Note that if you provide this
argument, you must ensure that all parallel processes
(i.e. :py:class:`vivarium.core.process.Process` objects
with the ``parallel`` attribute set to ``True``) are
instances of
:py:class:`vivarium.core.process.ParallelProcess`. This
constructor converts parallel processes to
``ParallelProcess`` objects automatically if you do not
provide this ``store`` argument.
initial_state: By default an empty dictionary, this is the
initial state of the simulation.
experiment_id: A unique identifier for the experiment. A
Expand Down Expand Up @@ -466,10 +423,6 @@ def __init__(
if self.display_info:
self._print_display()

# parallel settings
self.invoke = invoke or InvokeProcess
self.parallel: Dict[HierarchyPath, ParallelProcess] = {}

# get a mapping of all paths to processes
self.process_paths: Dict[HierarchyPath, Process] = {}
self._step_graph = _StepGraph()
Expand Down Expand Up @@ -572,6 +525,15 @@ def _make_store(
'load either composite, store, or '
'(processes and topology) into Engine')

self.processes = cast(
Dict[str, Any],
self._parallelize_processes(self.processes)
)
self.steps = cast(
Dict[str, Any],
self._parallelize_processes(self.steps)
)

# initialize the store
self.state: Store = generate_state(
self.processes,
Expand All @@ -592,6 +554,23 @@ def _make_store(
self.flow = self.state.get_flow() or {}
self.topology = self.state.get_topology()

def _parallelize_processes(
self, processes: Any) -> Union[dict, Process]:
'''Replace parallel processes with ParallelProcess objects.'''
if isinstance(processes, Process):
if processes.parallel and not isinstance(
processes, ParallelProcess):
processes = ParallelProcess(
processes, bool(self.profiler), self.stats_objs)
elif isinstance(processes, dict):
processes = {
key: self._parallelize_processes(value)
for key, value in processes.items()
}
else:
raise AssertionError(f'Unrecognized collection: {processes}')
return processes

def _add_step_path(
self,
step: Step,
Expand Down Expand Up @@ -682,7 +661,6 @@ def _emit_store_data(self) -> None:
def _invoke_process(
self,
process: Process,
path: HierarchyPath,
interval: float,
states: State,
) -> Any:
Expand All @@ -694,28 +672,16 @@ def _invoke_process(
Args:
process: The process.
path: The path at which the process resides. This is used to
track parallel processes in ``self.parallel``.
interval: The timestep for which to compute the update.
states: The simulation state to pass to
:py:meth:`vivarium.core.process.Process.next_update`.
Returns:
The deferred simulation update, for example a
:py:class:`vivarium.core.process.ParallelProcess` or an
:py:class:`InvokeProcess` object.
:py:class:`vivarium.core.process.ParallelProcess`.
"""
if process.parallel:
# add parallel process if it doesn't exist
if path not in self.parallel:
self.parallel[path] = ParallelProcess(
process, bool(self.profiler))
# trigger the computation of the parallel process
self.parallel[path].update(interval, states)

return self.parallel[path]
# if not parallel, perform a normal invocation
return self.invoke(process, interval, states)
process.send_command('next_update', (interval, states))
return process

def _process_update(
self,
Expand Down Expand Up @@ -748,7 +714,6 @@ def _process_update(

update = self._invoke_process(
process,
path,
interval,
states)

Expand Down Expand Up @@ -835,6 +800,20 @@ def apply_update(
flow_updates, deletions, view_expire
) = self.state.apply_update(update, state)

process_updates = [
(path, self._parallelize_processes(process))
for path, process in process_updates
]
step_updates = [
(path, self._parallelize_processes(step))
for path, step in step_updates
]
# Make sure the Store contains the parallelized processes.
for path, process in process_updates:
self.state.get_path(path).value = process
for path, step in step_updates:
self.state.get_path(path).value = step

flow_update_dict = dict(flow_updates)

if topology_updates:
Expand Down Expand Up @@ -968,16 +947,7 @@ def _check_complete(self) -> None:
f"the process at path {path} is an unapplied update"

def _remove_deleted_processes(self) -> None:
# find any parallel processes that were removed and terminate them
for terminated in self.parallel.keys() - (
self.process_paths.keys() | self._step_paths.keys()):
self.parallel[terminated].end()
stats = self.parallel[terminated].stats
if stats:
self.stats_objs.append(stats)
del self.parallel[terminated]

# remove deleted process paths from the front
'''Remove deleted processes from the front.'''
self.front = {
path: progress
for path, progress in self.front.items()
Expand Down Expand Up @@ -1107,6 +1077,12 @@ def run_for(
if force_complete and self.global_time == end_time:
force_complete = False

@staticmethod
def _end_process_if_parallel(process: Process) -> None:
if process.parallel:
assert isinstance(process, ParallelProcess)
process.end()

def end(self) -> None:
"""Terminate all processes running in parallel.
Expand All @@ -1115,10 +1091,8 @@ def end(self) -> None:
profiling stats, including stats from parallel sub-processes.
These stats are stored in ``self.stats``.
"""
for parallel in self.parallel.values():
parallel.end()
if parallel.stats:
self.stats_objs.append(parallel.stats)
apply_func_to_leaves(
self.processes, self._end_process_if_parallel)
if self.profiler:
self.profiler.disable()
total_stats = pstats.Stats(self.profiler)
Expand Down
Loading

0 comments on commit 65a3257

Please sign in to comment.