Skip to content

Commit

Permalink
Merge pull request #29 from exalearn/renaming
Browse files Browse the repository at this point in the history
Renaming to match with SC paper
  • Loading branch information
WardLT authored Jul 6, 2021
2 parents 33d9bd7 + 6072835 commit 8859772
Show file tree
Hide file tree
Showing 28 changed files with 373 additions and 742 deletions.
5 changes: 0 additions & 5 deletions colmena/method_server/__init__.py

This file was deleted.

6 changes: 3 additions & 3 deletions colmena/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Result(BaseModel):

# Performance tracking
time_created: float = Field(None, description="Time this value object was created")
time_input_received: float = Field(None, description="Time the inputs was received by the method server")
time_input_received: float = Field(None, description="Time the inputs was received by the task server")
time_compute_started: float = Field(None, description="Time workflow process began executing a task")
time_result_sent: float = Field(None, description="Time message was sent from the server")
time_result_received: float = Field(None, description="Time value was received by client")
Expand Down Expand Up @@ -130,15 +130,15 @@ def mark_result_received(self):
self.time_result_received = datetime.now().timestamp()

def mark_input_received(self):
"""Mark that a method server has received a value"""
"""Mark that a task server has received a value"""
self.time_input_received = datetime.now().timestamp()

def mark_compute_started(self):
"""Mark that the compute for a method has started"""
self.time_compute_started = datetime.now().timestamp()

def mark_result_sent(self):
"""Mark when a result is sent from the method server"""
"""Mark when a result is sent from the task server"""
self.time_result_sent = datetime.now().timestamp()

def set_result(self, result: Any, runtime: float = None):
Expand Down
20 changes: 10 additions & 10 deletions colmena/redis/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def make_queue_pairs(hostname: str, port: int = 6379, name='method',
value_server_threshold: Optional[int] = None,
value_server_hostname: Optional[str] = None,
value_server_port: Optional[int] = None)\
-> Tuple['ClientQueues', 'MethodServerQueues']:
-> Tuple['ClientQueues', 'TaskServerQueues']:
"""Make a pair of queues for a server and client
Args:
Expand All @@ -48,17 +48,17 @@ def make_queue_pairs(hostname: str, port: int = 6379, name='method',
the redis server for the task queues will be used.
value_server_port (int): See `value_server_hostname`
Returns:
(ClientQueues, MethodServerQueues): Pair of communicators set to use the correct channels
(ClientQueues, TaskServerQueues): Pair of communicators set to use the correct channels
"""

return (ClientQueues(hostname, port, name, serialization_method, keep_inputs,
topics, value_server_threshold, value_server_hostname,
value_server_port),
MethodServerQueues(hostname, port, name, topics=topics, clean_slate=clean_slate))
TaskServerQueues(hostname, port, name, topics=topics, clean_slate=clean_slate))


class RedisQueue:
"""A basic redis queue for communications used by the method server
"""A basic redis queue for communications used by the task server
A queue is defined by its prefix and a "topic" designation.
The full list of available topics is defined when creating the queue,
Expand Down Expand Up @@ -179,7 +179,7 @@ def put(self, input_data: str, topic: str = 'default'):
queue = f'{self.prefix}_{topic}'
assert queue in self._all_queues, f'Unrecognized topic: {topic}'

# Send it to the method server
# Send it to the task server
try:
self.redis_client.rpush(queue, input_data)
except redis.exceptions.ConnectionError:
Expand All @@ -204,7 +204,7 @@ def is_connected(self):


class ClientQueues:
"""Provides communication of method requests and results with the method server
"""Provides communication of method requests and results with the task server
This queue wraps communication with the underlying Redis queue and also handles communicating
requests using the :class:`Result` messaging format.
Expand Down Expand Up @@ -310,7 +310,7 @@ def send_inputs(self, *input_args: Any, method: str = None,
value_server_threshold=self.value_server_threshold
)

# Push the serialized value to the method server
# Push the serialized value to the task server
result.time_serialize_inputs = result.serialize()
self.outbound.put(result.json(exclude_unset=True), topic=topic)
logger.info(f'Client sent a {method} task with topic {topic}')
Expand Down Expand Up @@ -345,12 +345,12 @@ def get_result(self, timeout: Optional[int] = None, topic: Optional[str] = None)
return result_obj

def send_kill_signal(self):
"""Send the kill signal to the method server"""
"""Send the kill signal to the task server"""
self.outbound.put("null")


class MethodServerQueues:
"""Communication wrapper for the method server
class TaskServerQueues:
"""Communication wrapper for the task server
Handles receiving tasks
"""
Expand Down
12 changes: 6 additions & 6 deletions colmena/redis/tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from colmena.models import SerializationMethod
from colmena.redis.queue import RedisQueue, ClientQueues, MethodServerQueues, make_queue_pairs
from colmena.redis.queue import RedisQueue, ClientQueues, TaskServerQueues, make_queue_pairs
import pickle as pkl
import pytest

Expand Down Expand Up @@ -61,16 +61,16 @@ def test_flush(queue):
def test_client_method_pair():
"""Make sure method client/server can talk and back and forth"""
client = ClientQueues('localhost')
server = MethodServerQueues('localhost')
server = TaskServerQueues('localhost')

# Ensure client and server are talking to the same queue
assert client.outbound.prefix == server.inbound.prefix
assert client.inbound.prefix == server.outbound.prefix

# Push inputs to method server and make sure it is received
# Push inputs to task server and make sure it is received
client.send_inputs(1)
topic, task = server.get_task()
task.deserialize() # Method server does not deserialize automatically
task.deserialize() # task server does not deserialize automatically
assert topic == 'default'
assert task.args == (1,)
assert task.time_input_received is not None
Expand All @@ -94,7 +94,7 @@ def test_methods():
"""Test sending a method name"""
client, server = make_queue_pairs('localhost')

# Push inputs to method server and make sure it is received
# Push inputs to task server and make sure it is received
client.send_inputs(1, method='test')
_, task = server.get_task()
task.deserialize()
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_filtering():
"""Test filtering tasks by topic"""
client, server = make_queue_pairs('localhost', clean_slate=True, topics=['priority'])

# Simulate a result being sent through the method server
# Simulate a result being sent through the task server
client.send_inputs("hello", topic="priority")
topic, task = server.get_task()
task.deserialize()
Expand Down
5 changes: 5 additions & 0 deletions colmena/task_server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Implementations of the task server"""

from colmena.task_server.parsl import ParslTaskServer

__all__ = ['ParslTaskServer']
27 changes: 14 additions & 13 deletions colmena/method_server/base.py → colmena/task_server/base.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
"""Base class for the Method Server"""
"""Base class for the Task Server"""

from abc import ABCMeta, abstractmethod
from multiprocessing import Process
from typing import Optional
import logging

from colmena.exceptions import KillSignalException, TimeoutException
from colmena.redis.queue import MethodServerQueues
from colmena.redis.queue import TaskServerQueues

logger = logging.getLogger(__name__)


class BaseMethodServer(Process, metaclass=ABCMeta):
"""Abstract class that executes requests across distributed resources.
class BaseTaskServer(Process, metaclass=ABCMeta):
"""Abstract class for the Colmena Task Server, which manages the execution
of different asks
Clients submit requests to the server by pushing them to a Redis queue,
and then receives results from a second queue.
Clients submit task requests to the server by pushing them to a Redis queue,
and then receive results from a second queue.
Different implementations vary in how the queue is processed.
Start the method server by first instantiating it and then calling :meth:`start`
Start the task server by first instantiating it and then calling :meth:`start`
to launch the server in a separate process.
The method server is shutdown by pushing a ``None`` to the inputs queue,
signaling that no new tests will be incoming. The remaining tasks will
The task server can be stopped by pushing a ``None`` to the task queue,
signaling that no new tasks will be incoming. The remaining tasks will
continue to be pushed to the output queue.
"""

def __init__(self, queues: MethodServerQueues, timeout: Optional[int] = None):
def __init__(self, queues: TaskServerQueues, timeout: Optional[int] = None):
"""
Args:
queues (MethodServerQueues): Queues for the method server
queues (TaskServerQueues): Queues for the task server
timeout (int): Timeout, if desired
"""
super().__init__()
Expand All @@ -56,14 +57,14 @@ def listen_and_launch(self):

@abstractmethod
def _cleanup(self):
"""Close out any resources needed by the method server"""
"""Close out any resources needed by the task server"""
pass

def run(self) -> None:
"""Launch the thread and start running tasks
Blocks until the inputs queue is closed and all tasks have completed"""
logger.info(f"Started method server {self.__class__.__name__} on {self.ident}")
logger.info(f"Started task server {self.__class__.__name__} on {self.ident}")

# Loop until queue has closed
self.listen_and_launch()
Expand Down
36 changes: 19 additions & 17 deletions colmena/method_server/parsl.py → colmena/task_server/parsl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Parsl method server and related utilities"""
"""Parsl task server and related utilities"""
import os
import logging
import platform
Expand All @@ -17,8 +17,8 @@
from parsl.dataflow.futures import AppFuture

from colmena.models import Result
from colmena.method_server.base import BaseMethodServer
from colmena.redis.queue import MethodServerQueues
from colmena.task_server.base import BaseTaskServer
from colmena.redis.queue import TaskServerQueues
from colmena.proxy import resolve_proxies_async

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,7 +81,7 @@ def run_and_record_timing(func: Callable, result: Result) -> Result:


@python_app(executors=['_output_workers'])
def output_result(queues: MethodServerQueues, topic: str, result_obj):
def output_result(queues: TaskServerQueues, topic: str, result_obj: Result):
"""Submit the function result to the Redis queue
Args:
Expand Down Expand Up @@ -135,7 +135,7 @@ def run(self) -> None:
logger.warning(f'Task {task} with an exception: {exc}')

# Pull out the result objects
queues: MethodServerQueues = task.task_def['args'][0]
queues: TaskServerQueues = task.task_def['args'][0]
topic: str = task.task_def['args'][1]
method_task = task.task_def['depends'][0]
result_obj: Result = method_task.task_def['args'][0]
Expand All @@ -150,44 +150,46 @@ def run(self) -> None:
futures = not_done


class ParslMethodServer(BaseMethodServer):
"""Method server based on Parsl
class ParslTaskServer(BaseTaskServer):
"""Task server based on Parsl
Create a Parsl method server by first creating a resource configuration following
Create a Parsl task server by first creating a resource configuration following
the recommendations in `the Parsl documentation
<https://parsl.readthedocs.io/en/stable/userguide/configuring.html>`_.
Then instantiate a method server with a list of functions,
Then instantiate a task server with a list of Python functions,
configurations defining on which Parsl executors each function can run,
and the Parsl resource configuration.
The executor(s) for each function can be defined with a combination
of per method specifications
.. code-block:: python
ParslMethodServer([(f, {'executors': ['a']})], queues, config)
ParslTaskServer([(f, {'executors': ['a']})], queues, config)
and also using a default executor
.. code-block:: python
ParslMethodServer([f], queues, config, default_executors=['a'])
ParslTaskServer([f], queues, config, default_executors=['a'])
Further configuration options for each method can be defined
in the list of methods.
**Technical Details**
The method server stores each of the supplied methods as Parsl "PythonApp" classes.
The task server stores each of the supplied methods as Parsl "PythonApp" classes.
Tasks are launched using these PythonApps after being received on the queue.
The Future provided when requesting the method invocation is then passed
to second PythonApp that pushes the result of the function to the output
queue after it completes.
That second, "output_result," function runs on threads of the same
process as this method server.
process as this task server.
There is also a separate thread that monitors for Futures that yield an error
before the "output_result" function and sends back the error messages.
"""

def __init__(self, methods: List[Union[Callable, Tuple[Callable, Dict]]],
queues: MethodServerQueues,
queues: TaskServerQueues,
config: Config,
timeout: Optional[int] = None,
default_executors: Union[str, List[str]] = 'all',
Expand All @@ -200,7 +202,7 @@ def __init__(self, methods: List[Union[Callable, Tuple[Callable, Dict]]],
is a function and the second is a dictionary of the arguments being used to create
the Parsl ParslApp see `Parsl documentation
<https://parsl.readthedocs.io/en/stable/stubs/parsl.app.app.python_app.html#parsl.app.app.python_app>`_.
queues (MethodServerQueues): Queues for the method server
queues (TaskServerQueues): Queues for the task server
config: Parsl configuration
timeout (int): Timeout, if desired
default_executors: Executor or list of executors to use by default.
Expand All @@ -225,7 +227,7 @@ def __init__(self, methods: List[Union[Callable, Tuple[Callable, Dict]]],
self.methods_ = {}
for method in methods:
# Get the options or use the defaults
if isinstance(method, tuple):
if isinstance(method, (tuple, list)):
if len(method) != 2:
raise ValueError('Method description should a tuple of length 2')
function, options = method
Expand Down Expand Up @@ -291,7 +293,7 @@ def submit_application(self, method_name: str, result: Result) -> AppFuture:
return self.methods_[method_name](result)

def _cleanup(self):
"""Close out any resources needed by the method server"""
"""Close out any resources needed by the task server"""
# Wait until all tasks have finished
dfk = parsl.dfk()
dfk.wait_for_current_tasks()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Tests for the Parsl implementation of the method server"""
"""Tests for the Parsl implementation of the task server"""
from typing import Tuple
from parsl.config import Config
from parsl import ThreadPoolExecutor

from colmena.exceptions import KillSignalException, TimeoutException
from colmena.redis.queue import ClientQueues, make_queue_pairs

from colmena.method_server.parsl import ParslMethodServer
from colmena.task_server.parsl import ParslTaskServer
from pytest import fixture, raises, mark


Expand All @@ -25,11 +25,11 @@ def config():
)


# Make a simple method server
# Make a simple task server
@fixture(autouse=True)
def server_and_queue(config) -> Tuple[ParslMethodServer, ClientQueues]:
def server_and_queue(config) -> Tuple[ParslTaskServer, ClientQueues]:
client_q, server_q = make_queue_pairs('localhost', clean_slate=True)
server = ParslMethodServer([f], server_q, config)
server = ParslTaskServer([f], server_q, config)
yield server, client_q
if server.is_alive():
server.terminate()
Expand Down
4 changes: 2 additions & 2 deletions colmena/thinker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class BaseThinker(Thread):
Each agent communicates with others via `queues <https://docs.python.org/3/library/queue.html>`_
or other `threading objects <https://docs.python.org/3/library/threading.html#>`_ and
the Colmena method server via the :class:`ClientQueues`.
the Colmena task server via the :class:`ClientQueues`.
The only communication method available by default is a class attribute named ``done``
that is used to signal that the program should terminate.
Expand All @@ -203,7 +203,7 @@ def __init__(self, queue: ClientQueues, resource_counter: Optional[ResourceCount
daemon: bool = True, **kwargs):
"""
Args:
queue: Queue wrapper used to communicate with method server
queue: Queue wrapper used to communicate with task server
resource_counter: Utility to used track resource utilization
daemon: Whether to launch this as a daemon thread
**kwargs: Options passed to :class:`Thread`
Expand Down
Loading

0 comments on commit 8859772

Please sign in to comment.