Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority queue functionality to HTEX #3575

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from parsl.addresses import get_all_addresses
from parsl.app.errors import RemoteExceptionWrapper
from parsl.data_provider.staging import Staging
from parsl.executors.errors import BadMessage, ScalingFailed
from parsl.executors.errors import BadMessage, ExecutorError, ScalingFailed
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput.errors import CommandClientTimeoutError
from parsl.executors.high_throughput.manager_selector import (
Expand Down Expand Up @@ -259,6 +259,7 @@ def __init__(self,
prefetch_capacity: int = 0,
heartbeat_threshold: int = 120,
heartbeat_period: int = 30,
queue_threshold: Optional[int] = -1,
drain_period: Optional[int] = None,
poll_period: int = 10,
address_probe_timeout: Optional[int] = None,
Expand Down Expand Up @@ -322,6 +323,7 @@ def __init__(self,
self.interchange_port_range = interchange_port_range
self.heartbeat_threshold = heartbeat_threshold
self.heartbeat_period = heartbeat_period
self.queue_threshold = queue_threshold
self.drain_period = drain_period
self.poll_period = poll_period
self.run_dir = '.'
Expand Down Expand Up @@ -551,6 +553,7 @@ def _start_local_interchange_process(self) -> None:
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
"cert_dir": self.cert_dir,
"manager_selector": self.manager_selector,
"queue_threshold": self.queue_threshold,
"run_id": self.run_id,
}

Expand Down Expand Up @@ -659,8 +662,20 @@ def submit(self, func, resource_specification, *args, **kwargs):
Returns:
Future
"""

validate_resource_spec(resource_specification, self.enable_mpi_mode)
if self.enable_mpi_mode:
validate_resource_spec(resource_specification, self.enable_mpi_mode)
else:
if resource_specification and isinstance(resource_specification, dict):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this all belongs in validate_resource_spec()

try implementing #3519 as a separate PR first to get the structure in place in that function for "mpi vs non-mpi" resource validation (with, I guess, no resource spec allowed for non-mpi mode?) and you can get a parsl issue fixed as a nice side effect

logger.debug("Got resource_specification: {}".format(resource_specification))
acceptable_fields = set(['running_time_min', 'priority'])
keys = set(resource_specification.keys())

if not keys.issubset(acceptable_fields):
message = "Task resource specification only accepts these types of resources: {}".format(
', '.join(acceptable_fields))
logger.error(message)
raise ExecutorError(self, message)
# might need an else statement to raise an error if the not a resource spec

if self.bad_state_is_set:
raise self.executor_exception
Expand All @@ -684,7 +699,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
except TypeError:
raise SerializationError(func.__name__)

msg = {"task_id": task_id, "buffer": fn_buf}
msg = {"task_id": task_id, 'resource_spec': resource_specification, "buffer": fn_buf}

# Post task to the outgoing queue
self.outgoing_q.put(msg)
Expand Down
54 changes: 40 additions & 14 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Dict, List, NoReturn, Optional, Sequence, Set, Tuple, cast

import zmq
from sortedcontainers import SortedList

from parsl import curvezmq
from parsl.app.errors import RemoteExceptionWrapper
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self,
poll_period: int,
cert_dir: Optional[str],
manager_selector: ManagerSelector,
queue_threshold: int,
run_id: str,
) -> None:
"""
Expand Down Expand Up @@ -131,7 +133,8 @@ def __init__(self,
self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port

self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6)
self.priority_pending_task_queue: SortedList[Any] = SortedList(key=lambda msg: -msg['resource_spec']['priority'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my gut says to avoid two queues and have a single queue. tasks without any priority effectively receive an infinitely-weak priority in your implementation, I think, and you can implement that same behaviour with a single priority queue?

self.general_pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6)
self.count = 0

self.worker_ports = worker_ports
Expand Down Expand Up @@ -166,6 +169,7 @@ def __init__(self,
self.heartbeat_threshold = heartbeat_threshold

self.manager_selector = manager_selector
self.queue_threshold = queue_threshold

self.current_platform = {'parsl_v': PARSL_VERSION,
'python_v': "{}.{}.{}".format(sys.version_info.major,
Expand All @@ -177,8 +181,9 @@ def __init__(self,

logger.info("Platform info: {}".format(self.current_platform))

def get_tasks(self, count: int) -> Sequence[dict]:
""" Obtains a batch of tasks from the internal pending_task_queue
def get_tasks(self, count: int, ) -> Sequence[dict]:
""" Obtains a batch of tasks from the internal priority_pending_task_queue first
then general_pending_task_queue

Parameters
----------
Expand All @@ -192,12 +197,14 @@ def get_tasks(self, count: int) -> Sequence[dict]:
"""
tasks = []
for _ in range(0, count):
try:
x = self.pending_task_queue.get(block=False)
except queue.Empty:
break
if len(self.priority_pending_task_queue) > 0:
x = self.priority_pending_task_queue.pop(-1)
else:
tasks.append(x)
try:
x = self.general_pending_task_queue.get(block=False)
except queue.Empty:
break
tasks.append(x)

return tasks

Expand All @@ -215,11 +222,27 @@ def task_puller(self) -> NoReturn:
msg = self.task_incoming.recv_pyobj()
except zmq.Again:
# We just timed out while attempting to receive
logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize()))
total_tasks = self.priority_pending_task_queue.__len__() + self.general_pending_task_queue.qsize()
logger.debug("zmq.Again with {} tasks in internal queue".format(total_tasks))
continue

logger.debug("putting message onto pending_task_queue")
self.pending_task_queue.put(msg)
if self.queue_threshold == -1:
# logger.debug("Priority queue disabled: putting message onto general_pending_task_queue")
logger.debug("Priority queue disabled: putting message onto general_pending_task_queue")
self.general_pending_task_queue.put(msg)
else:
resource_spec = msg.get('resource_spec', {})
if 'priority' in resource_spec:
priority = resource_spec['priority']
if priority < self.queue_threshold:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm unclear what semantics you're trying to express with queue_threshold -- I'm going to give you a low priority but under a particular priority, parsl should round it to (effectively) +infinity?

self.priority_pending_task_queue.add(msg)
logger.debug("putting message onto priority_pending_task_queue")
else:
self.general_pending_task_queue.put(msg)
logger.debug("putting message onto general_pending_task_queue")
else:
self.general_pending_task_queue.put(msg)
logger.debug("putting message onto general_pending_task_queue")
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

Expand Down Expand Up @@ -253,7 +276,7 @@ def _command_server(self) -> NoReturn:
command_req = self.command_channel.recv_pyobj()
logger.debug("Received command request: {}".format(command_req))
if command_req == "OUTSTANDING_C":
outstanding = self.pending_task_queue.qsize()
outstanding = self.priority_pending_task_queue.__len__() + self.general_pending_task_queue.qsize()
Copy link
Collaborator

@benclifford benclifford Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use len(priority_pending_task_queue) rather than invoking __len__

for manager in self._ready_managers.values():
outstanding += len(manager['tasks'])
reply = outstanding
Expand Down Expand Up @@ -481,17 +504,20 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r
m['active'] = False
self._send_monitoring_info(monitoring_radio, m)

def task_queues_not_empty(self) -> bool:
return len(self.priority_pending_task_queue) != 0 or not self.general_pending_task_queue.empty()

def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers

logger.debug("Managers count (interesting/total): {interesting}/{total}".format(
total=len(self._ready_managers),
interesting=len(interesting_managers)))

if interesting_managers and not self.pending_task_queue.empty():
if interesting_managers and self.task_queues_not_empty():
shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers)

while shuffled_managers and not self.pending_task_queue.empty(): # cf. the if statement above...
while shuffled_managers and self.task_queues_not_empty(): # cf. the if statement above...
manager_id = shuffled_managers.pop()
m = self._ready_managers[manager_id]
tasks_inflight = len(m['tasks'])
Expand Down
6 changes: 6 additions & 0 deletions parsl/executors/high_throughput/manager_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list
c_manager_list = list(manager_list)
random.shuffle(c_manager_list)
return c_manager_list


class BlockIdManagerSelector(ManagerSelector):

def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]:
return sorted(manager_list, key=lambda x: (ready_managers[x]['block_id'] is not None, ready_managers[x]['block_id']))
1 change: 1 addition & 0 deletions parsl/tests/test_htex/test_zmq_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s
logdir=".",
logging_level=logging.INFO,
manager_selector=RandomManagerSelector(),
queue_threshold=-1,
poll_period=10,
run_id="test_run_id")

Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mpi4py
# (where it's specified in setup.py)
sqlalchemy>=1.4,<2
sqlalchemy2-stubs
sortedcontainers-stubs

Sphinx==4.5.0
twine
Expand Down
Loading