-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add priority queue functionality to HTEX #3575
Changes from all commits
d2a1988
f4e04d8
180313d
b4e6a2c
788f775
7fc455e
01946a4
f04e4f9
8a4a6fb
4318f14
14e83fb
34d9a23
8afb253
5f6f49b
de8da7d
103b92e
9791b9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
from typing import Any, Dict, List, NoReturn, Optional, Sequence, Set, Tuple, cast | ||
|
||
import zmq | ||
from sortedcontainers import SortedList | ||
|
||
from parsl import curvezmq | ||
from parsl.app.errors import RemoteExceptionWrapper | ||
|
@@ -55,6 +56,7 @@ def __init__(self, | |
poll_period: int, | ||
cert_dir: Optional[str], | ||
manager_selector: ManagerSelector, | ||
queue_threshold: int, | ||
run_id: str, | ||
) -> None: | ||
""" | ||
|
@@ -131,7 +133,8 @@ def __init__(self, | |
self.hub_address = hub_address | ||
self.hub_zmq_port = hub_zmq_port | ||
|
||
self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6) | ||
self.priority_pending_task_queue: SortedList[Any] = SortedList(key=lambda msg: -msg['resource_spec']['priority']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my gut says to avoid two queues and have a single queue. tasks without any priority effectively receive an infinitely-weak priority in your implementation, I think, and you can implement that same behaviour with a single priority queue? |
||
self.general_pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6) | ||
self.count = 0 | ||
|
||
self.worker_ports = worker_ports | ||
|
@@ -166,6 +169,7 @@ def __init__(self, | |
self.heartbeat_threshold = heartbeat_threshold | ||
|
||
self.manager_selector = manager_selector | ||
self.queue_threshold = queue_threshold | ||
|
||
self.current_platform = {'parsl_v': PARSL_VERSION, | ||
'python_v': "{}.{}.{}".format(sys.version_info.major, | ||
|
@@ -177,8 +181,9 @@ def __init__(self, | |
|
||
logger.info("Platform info: {}".format(self.current_platform)) | ||
|
||
def get_tasks(self, count: int) -> Sequence[dict]: | ||
""" Obtains a batch of tasks from the internal pending_task_queue | ||
def get_tasks(self, count: int, ) -> Sequence[dict]: | ||
""" Obtains a batch of tasks from the internal priority_pending_task_queue first | ||
then general_pending_task_queue | ||
|
||
Parameters | ||
---------- | ||
|
@@ -192,12 +197,14 @@ def get_tasks(self, count: int) -> Sequence[dict]: | |
""" | ||
tasks = [] | ||
for _ in range(0, count): | ||
try: | ||
x = self.pending_task_queue.get(block=False) | ||
except queue.Empty: | ||
break | ||
if len(self.priority_pending_task_queue) > 0: | ||
x = self.priority_pending_task_queue.pop(-1) | ||
else: | ||
tasks.append(x) | ||
try: | ||
x = self.general_pending_task_queue.get(block=False) | ||
except queue.Empty: | ||
break | ||
tasks.append(x) | ||
|
||
return tasks | ||
|
||
|
@@ -215,11 +222,27 @@ def task_puller(self) -> NoReturn: | |
msg = self.task_incoming.recv_pyobj() | ||
except zmq.Again: | ||
# We just timed out while attempting to receive | ||
logger.debug("zmq.Again with {} tasks in internal queue".format(self.pending_task_queue.qsize())) | ||
total_tasks = self.priority_pending_task_queue.__len__() + self.general_pending_task_queue.qsize() | ||
logger.debug("zmq.Again with {} tasks in internal queue".format(total_tasks)) | ||
continue | ||
|
||
logger.debug("putting message onto pending_task_queue") | ||
self.pending_task_queue.put(msg) | ||
if self.queue_threshold == -1: | ||
# logger.debug("Priority queue disabled: putting message onto general_pending_task_queue") | ||
logger.debug("Priority queue disabled: putting message onto general_pending_task_queue") | ||
self.general_pending_task_queue.put(msg) | ||
else: | ||
resource_spec = msg.get('resource_spec', {}) | ||
if 'priority' in resource_spec: | ||
priority = resource_spec['priority'] | ||
if priority < self.queue_threshold: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm unclear what semantics you're trying to express with |
||
self.priority_pending_task_queue.add(msg) | ||
logger.debug("putting message onto priority_pending_task_queue") | ||
else: | ||
self.general_pending_task_queue.put(msg) | ||
logger.debug("putting message onto general_pending_task_queue") | ||
else: | ||
self.general_pending_task_queue.put(msg) | ||
logger.debug("putting message onto general_pending_task_queue") | ||
task_counter += 1 | ||
logger.debug(f"Fetched {task_counter} tasks so far") | ||
|
||
|
@@ -253,7 +276,7 @@ def _command_server(self) -> NoReturn: | |
command_req = self.command_channel.recv_pyobj() | ||
logger.debug("Received command request: {}".format(command_req)) | ||
if command_req == "OUTSTANDING_C": | ||
outstanding = self.pending_task_queue.qsize() | ||
outstanding = self.priority_pending_task_queue.__len__() + self.general_pending_task_queue.qsize() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use |
||
for manager in self._ready_managers.values(): | ||
outstanding += len(manager['tasks']) | ||
reply = outstanding | ||
|
@@ -481,17 +504,20 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r | |
m['active'] = False | ||
self._send_monitoring_info(monitoring_radio, m) | ||
|
||
def task_queues_not_empty(self) -> bool: | ||
return len(self.priority_pending_task_queue) != 0 or not self.general_pending_task_queue.empty() | ||
|
||
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: | ||
# Check if there are tasks that could be sent to managers | ||
|
||
logger.debug("Managers count (interesting/total): {interesting}/{total}".format( | ||
total=len(self._ready_managers), | ||
interesting=len(interesting_managers))) | ||
|
||
if interesting_managers and not self.pending_task_queue.empty(): | ||
if interesting_managers and self.task_queues_not_empty(): | ||
shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) | ||
|
||
while shuffled_managers and not self.pending_task_queue.empty(): # cf. the if statement above... | ||
while shuffled_managers and self.task_queues_not_empty(): # cf. the if statement above... | ||
manager_id = shuffled_managers.pop() | ||
m = self._ready_managers[manager_id] | ||
tasks_inflight = len(m['tasks']) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this all belongs in validate_resource_spec()
try implementing #3519 as a separate PR first to get the structure in place in that function for "mpi vs non-mpi" resource validation (with, I guess, no resource spec allowed for non-mpi mode?) and you can get a parsl issue fixed as a nice side effect