Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/MundiaNderi/parsl
Browse files Browse the repository at this point in the history
Merging branches
  • Loading branch information
MundiaNderi committed Mar 18, 2024
2 parents 62d33b0 + 5f32771 commit 46ea2cb
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E126, E402, E129, W504
max-line-length = 157
max-line-length = 152
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
# E741 disallows ambiguous single letter names which look like numbers
Expand Down
10 changes: 9 additions & 1 deletion docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ Local builds

To build the documentation locally, use::

$ make html
$ make clean html

To view the freshly rebuilt docs, use::

$ cd _build/html
$ python3 -m http.server 8080

Once the python http server is launched, point your browser to `http://localhost:8080 <http://localhost:8080>`_


Regenerate module stubs
--------------------------
Expand Down
5 changes: 3 additions & 2 deletions docs/userguide/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ parameters include access keys, instance type, and spot bid price
Parsl currently supports the following providers:

1. `parsl.providers.LocalProvider`: The provider allows you to run locally on your laptop or workstation.
2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler.
2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. **This provider is deprecated and will be removed by 2024.04**.
3. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler.
4. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler.
5. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler.
Expand All @@ -48,7 +48,8 @@ Parsl currently supports the following providers:
8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud.
9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster.
10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster.
11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler
11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler.



Executors
Expand Down
3 changes: 2 additions & 1 deletion parsl/dataflow/taskrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class TaskRecord(TypedDict, total=False):
# these three could be more strongly typed perhaps but I'm not thinking about that now
func: Callable
fn_hash: str
args: Sequence[Any] # in some places we uses a Tuple[Any, ...] and in some places a List[Any]. This is an attempt to correctly type both of those.
# in some places we uses a Tuple[Any, ...] and in some places a List[Any]. This is an attempt to correctly type both of those.
args: Sequence[Any]
kwargs: Dict[str, Any]

time_invoked: Optional[datetime.datetime]
Expand Down
25 changes: 16 additions & 9 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import queue
import datetime
import pickle
from dataclasses import dataclass
from multiprocessing import Process, Queue
from typing import Dict, Sequence
from typing import List, Optional, Tuple, Union, Callable
Expand Down Expand Up @@ -629,8 +630,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
The outgoing_q is an external process listens on this
queue for new work. This method behaves like a
submit call as described here `Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
submit call as described here
`Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
Args:
- func (callable) : Callable function
- resource_specification (dict): Dictionary containing relevant info about task that is needed by underlying executors.
Expand Down Expand Up @@ -694,7 +695,7 @@ def create_monitoring_info(self, status):
def workers_per_node(self) -> Union[int, float]:
return self._workers_per_node

def scale_in(self, blocks, max_idletime=None):
def scale_in(self, blocks: int, max_idletime: Optional[float] = None) -> List[str]:
"""Scale in the number of active blocks by specified amount.
The scale in method here is very rude. It doesn't give the workers
Expand All @@ -721,25 +722,31 @@ def scale_in(self, blocks, max_idletime=None):
List of block IDs scaled in
"""
logger.debug(f"Scale in called, blocks={blocks}")

@dataclass
class BlockInfo:
tasks: int # sum of tasks in this block
idle: float # shortest idle time of any manager in this block

managers = self.connected_managers()
block_info = {} # block id -> list( tasks, idle duration )
block_info: Dict[str, BlockInfo] = {}
for manager in managers:
if not manager['active']:
continue
b_id = manager['block_id']
if b_id not in block_info:
block_info[b_id] = [0, float('inf')]
block_info[b_id][0] += manager['tasks']
block_info[b_id][1] = min(block_info[b_id][1], manager['idle_duration'])
block_info[b_id] = BlockInfo(tasks=0, idle=float('inf'))
block_info[b_id].tasks += manager['tasks']
block_info[b_id].idle = min(block_info[b_id].idle, manager['idle_duration'])

sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1][1], item[1][0]))
sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1].idle, item[1].tasks))
logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks")
if max_idletime is None:
block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]]
else:
block_ids_to_kill = []
for x in sorted_blocks:
if x[1][1] > max_idletime and x[1][0] == 0:
if x[1].idle > max_idletime and x[1].tasks == 0:
block_ids_to_kill.append(x[0])
if len(block_ids_to_kill) == blocks:
break
Expand Down
13 changes: 11 additions & 2 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ManagerLost(Exception):
''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats
have been missed.
'''

def __init__(self, manager_id: bytes, hostname: str) -> None:
self.manager_id = manager_id
self.tstamp = time.time()
Expand All @@ -49,6 +50,7 @@ def __str__(self) -> str:
class VersionMismatch(Exception):
''' Manager and Interchange versions do not match
'''

def __init__(self, interchange_version: str, manager_version: str):
self.interchange_version = interchange_version
self.manager_version = manager_version
Expand All @@ -66,6 +68,7 @@ class Interchange:
2. Allow for workers to join and leave the union
3. Detect workers that have failed using heartbeats
"""

def __init__(self,
client_address: str = "127.0.0.1",
interchange_address: Optional[str] = None,
Expand Down Expand Up @@ -392,7 +395,8 @@ def start(self) -> None:
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.warning("Exiting")

def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None:
def process_task_outgoing_incoming(self, interesting_managers:
Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None:
"""Process one message from manager on the task_outgoing channel.
Note that this message flow is in contradiction to the name of the
channel - it is not an outgoing message and it is not a task.
Expand Down Expand Up @@ -620,8 +624,13 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:
-------
None.
"""

if format_string is None:
format_string = "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d %(processName)s(%(process)d) %(threadName)s %(funcName)s [%(levelname)s] %(message)s"
format_string = (
"%(asctime)s.%(msecs)03d %(name)s:%(lineno)d "
"%(processName)s(%(process)d) %(threadName)s "
"%(funcName)s [%(levelname)s] %(message)s"
)

global logger
logger = logging.getLogger(LOGGER_NAME)
Expand Down
10 changes: 8 additions & 2 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Manager:
| | IPC-Qeueues
"""

def __init__(self, *,
addresses,
address_probe_timeout,
Expand Down Expand Up @@ -413,7 +414,9 @@ def worker_watchdog(self, kill_event: threading.Event):
raise WorkerLost(worker_id, platform.node())
except Exception:
logger.info("Putting exception for executor task {} in the pending result queue".format(task['task_id']))
result_package = {'type': 'result', 'task_id': task['task_id'], 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))}
result_package = {'type': 'result',
'task_id': task['task_id'],
'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))}
pkl_package = pickle.dumps(result_package)
self.pending_result_queue.put(pkl_package)
except KeyError:
Expand Down Expand Up @@ -867,7 +870,10 @@ def strategyorlist(s: str):
block_id=args.block_id,
cores_per_worker=float(args.cores_per_worker),
mem_per_worker=None if args.mem_per_worker == 'None' else float(args.mem_per_worker),
max_workers_per_node=args.max_workers_per_node if args.max_workers_per_node == float('inf') else int(args.max_workers_per_node),
max_workers_per_node=(
args.max_workers_per_node if args.max_workers_per_node == float('inf')
else int(args.max_workers_per_node)
),
prefetch_capacity=int(args.prefetch_capacity),
heartbeat_threshold=int(args.hb_threshold),
heartbeat_period=int(args.hb_period),
Expand Down
3 changes: 2 additions & 1 deletion parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ def _general_strategy(self, status_list, *, strategy_type):
exec_status.scale_in(active_blocks - min_blocks)

else:
logger.debug(f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s for executor {label}; not scaling in")
logger.debug
(f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s for executor {label}; not scaling in")

# Case 2
# More tasks than the available slots.
Expand Down
9 changes: 7 additions & 2 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,9 @@ def start(self,
reprocessable_first_resource_messages.append(msg)
else:
if task_try_id in deferred_resource_messages:
logger.error("Task {} already has a deferred resource message. Discarding previous message.".format(msg['task_id']))
logger.error(
"Task {} already has a deferred resource message. Discarding previous message."
.format(msg['task_id']))
deferred_resource_messages[task_try_id] = msg
elif msg['last_msg']:
# This assumes that the primary key has been added
Expand All @@ -544,7 +546,10 @@ def start(self,
if reprocessable_last_resource_messages:
self._insert(table=STATUS, messages=reprocessable_last_resource_messages)
except Exception:
logger.exception("Exception in db loop: this might have been a malformed message, or some other error. monitoring data may have been lost")
logger.exception(
"Exception in db loop: this might have been a malformed message, "
"or some other error. monitoring data may have been lost"
)
exception_happened = True
if exception_happened:
raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log")
Expand Down
8 changes: 6 additions & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,12 @@ def close(self) -> None:
self._dfk_channel.close()
if exception_msgs:
for exception_msg in exception_msgs:
self.logger.error("{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(exception_msg[0],
exception_msg[1]))
self.logger.error(
"{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(
exception_msg[0],
exception_msg[1]
)
)
self.router_proc.terminate()
self.dbm_proc.terminate()
self.filesystem_proc.terminate()
Expand Down
11 changes: 9 additions & 2 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,18 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s
else:
logger.error("Could not read job ID from submit command standard output.")
logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())
raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode)
raise SubmitException(
job_name,
"Could not read job ID from submit command standard output",
stdout=stdout,
stderr=stderr,
retcode=retcode
)
else:
logger.error("Submit command failed")
logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())
raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode)
raise SubmitException(job_name, "Could not read job ID from submit command standard output",
stdout=stdout, stderr=stderr, retcode=retcode)

def cancel(self, job_ids):
''' Cancels the jobs specified by a list of job ids
Expand Down
1 change: 1 addition & 0 deletions test.memo.stdout.x
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
X

0 comments on commit 46ea2cb

Please sign in to comment.