diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 953c17405e..e5247c5d25 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -10,7 +10,7 @@ jobs: main-test-suite: strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] runs-on: ubuntu-20.04 timeout-minutes: 60 @@ -60,7 +60,7 @@ jobs: export PARSL_TEST_PRESERVE_NUM_RUNS=7 make test - ln -s .pytest/parsltest-current test_runinfo + ln -s pytest-parsl/parsltest-current test_runinfo - name: Documentation checks run: | @@ -80,11 +80,11 @@ jobs: # database manager log file or monitoring router log file. It would be better if # the tests themselves failed immediately when there was a monitoring error, but # in the absence of that, this is a dirty way to check. - bash -c '! grep ERROR .pytest/parsltest-current/runinfo*/*/database_manager.log' - bash -c '! grep ERROR .pytest/parsltest-current/runinfo*/*/monitoring_router.log' + bash -c '! grep ERROR pytest-parsl/parsltest-current/runinfo*/*/database_manager.log' + bash -c '! grep ERROR pytest-parsl/parsltest-current/runinfo*/*/monitoring_router.log' # temporary; until test-matrixification - rm -f .pytest/parsltest-current test_runinfo + rm -f pytest-parsl/parsltest-current test_runinfo - name: Checking parsl-visualize run: | @@ -105,6 +105,6 @@ jobs: name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} path: | runinfo/ - .pytest/ + pytest-parsl/ ci_job_info.txt compression-level: 9 diff --git a/.gitignore b/.gitignore index acdb6254d1..8811016b83 100644 --- a/.gitignore +++ b/.gitignore @@ -63,6 +63,7 @@ coverage.xml *.cover .hypothesis/ /.pytest/ +/pytest-parsl/ # Translations *.mo diff --git a/Makefile b/Makefile index 90f20601e9..4d2f37f715 100644 --- a/Makefile +++ b/Makefile @@ -84,7 +84,7 @@ radical_local_test: .PHONY: config_local_test config_local_test: $(CCTOOLS_INSTALL) - pip3 install ".[monitoring,visualization,proxystore]" + pip3 install ".[monitoring,visualization,proxystore,kubernetes]" PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 .PHONY: site_test diff --git a/README.rst b/README.rst index da7f8245a5..a8254e2e40 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ Parsl - Parallel Scripting Library ================================== -|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| +|licence| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| Parsl extends parallelism in Python beyond a single computer. @@ -43,9 +43,6 @@ then explore the `parallel computing patterns `_. Requirements for running tests can be found `here `_. +Parsl is supported in Python 3.9+. Requirements can be found `here `_. Requirements for running tests can be found `here `_. Code of Conduct =============== diff --git a/codemeta.json b/codemeta.json index 917abb5afd..9754c55358 100644 --- a/codemeta.json +++ b/codemeta.json @@ -191,8 +191,8 @@ "name": "The Python Package Index", "url": "https://pypi.org" }, - "runtimePlatform": "Python 3.8", + "runtimePlatform": "Python 3.9", "url": "https://github.com/Parsl/parsl", "developmentStatus": "active", - "programmingLanguage": "Python :: 3.8" + "programmingLanguage": "Python :: 3.9" } diff --git a/docs/faq.rst b/docs/faq.rst index a03287c378..ca4bd82bdb 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -209,7 +209,7 @@ For instance, with conda, follow this `cheatsheet # Install packages: - conda install + conda install How do I run code that uses Python2.X? diff --git a/docs/quickstart.rst b/docs/quickstart.rst index f1de3d1f94..e1c3821466 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -10,7 +10,7 @@ Installation Parsl is available on `PyPI `_ and `conda-forge `_. -Parsl requires Python3.8+ and has been tested on Linux and macOS. +Parsl requires Python3.9+ and has been tested on Linux. Installation using Pip @@ -31,7 +31,7 @@ Installation using Conda 1. Create and activate a new conda environment:: - $ conda create --name parsl_py38 python=3.8 + $ conda create --name parsl_py38 python=3.9 $ source activate parsl_py38 2. Install Parsl:: @@ -236,7 +236,7 @@ for reporting purposes. As an NSF-funded project, our ability to track usage metrics is important for continued funding. -You can opt-in by setting ``usage_tracking=True`` in the configuration object (`parsl.config.Config`). +You can opt-in by setting ``usage_tracking=3`` in the configuration object (`parsl.config.Config`). To read more about what information is collected and how it is used see :ref:`label-usage-tracking`. diff --git a/docs/reference.rst b/docs/reference.rst index f2d89afaf8..3436635cad 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -78,6 +78,16 @@ Executors parsl.executors.FluxExecutor parsl.executors.radical.RadicalPilotExecutor +Manager Selectors +================= + +.. autosummary:: + :toctree: stubs + :nosignatures: + + parsl.executors.high_throughput.manager_selector.RandomManagerSelector + parsl.executors.high_throughput.manager_selector.BlockIdManagerSelector + Launchers ========= diff --git a/docs/userguide/usage_tracking.rst b/docs/userguide/usage_tracking.rst index 777e8003b2..da8ac9b79d 100644 --- a/docs/userguide/usage_tracking.rst +++ b/docs/userguide/usage_tracking.rst @@ -1,82 +1,171 @@ .. _label-usage-tracking: -Usage statistics collection +Usage Statistics Collection =========================== -Parsl uses an **Opt-in** model to send usage statistics back to the Parsl development team to -measure worldwide usage and improve reliability and usability. The usage statistics are used only for -improvements and reporting. They are not shared in raw form outside of the Parsl team. - +Parsl uses an **Opt-in** model for usage tracking, allowing users to decide if they wish to participate. Usage statistics are crucial for improving software reliability and help focus development and maintenance efforts on the most used components of Parsl. The collected data is used solely for enhancements and reporting and is not shared in its raw form outside of the Parsl team. Why are we doing this? ---------------------- -The Parsl development team receives support from government funding agencies. For the team to continue to -receive such funding, and for the agencies themselves to argue for funding, both the team and the agencies -must be able to demonstrate that the scientific community is benefiting from these investments. To this end, -it is important that we provide aggregate usage data about such things as the following: +The Parsl development team relies on funding from government agencies. To sustain this funding and advocate for continued support, it is essential to show that the research community benefits from these investments. + +By opting in to share usage data, you actively support the ongoing development and maintenance of Parsl. (See:ref:`What is sent? ` below). + +Opt-In Model +------------ + +We use an **opt-in model** for usage tracking to respect user privacy and provide full control over shared information. We hope that developers and researchers will choose to send us this information. The reason is that we need this data - it is a requirement for funding. -* How many people use Parsl -* Average job length -* Parsl exit codes +Choose the data you share with Usage Tracking Levels. -By participating in this project, you help justify continuing support for the software on which you rely. -(see :ref:`What is sent? ` below). +**Usage Tracking Levels:** -Opt-In ------- +* **Level 1:** Only basic information such as Python version, Parsl version, and platform name (Linux, MacOS, etc.) +* **Level 2:** Level 1 information and configuration information including provider, executor, and launcher names. +* **Level 3:** Level 2 information and workflow execution details, including the number of applications run, failures, and execution time. -We have chosen opt-in collection rather than opt-out with the hope that developers and researchers -will choose to send us this information. The reason is that we need this data - it is a requirement for funding. +By enabling usage tracking, you support Parsl's development. -By opting-in, and allowing these statistics to be reported back, you are explicitly supporting the -further development of Parsl. +**To opt-in, set** ``usage_tracking`` **to the desired level (1, 2, or 3) in the configuration object** (``parsl.config.Config``) **.** -If you wish to opt in to usage reporting, set ``usage_tracking=True`` in the configuration object (`parsl.config.Config`). +Example: +.. code-block:: python3 + + config = Config( + executors=[ + HighThroughputExecutor( + ... + ) + ], + usage_tracking=3 + ) .. _what-is-sent: What is sent? ------------- -* IP address -* Run UUID -* Start and end times -* Number of executors used -* Number of failures -* Parsl and Python version -* OS and OS version - +The data collected depends on the tracking level selected: + +* **Level 1:** Only basic information such as Python version, Parsl version, and platform name (Linux, MacOS, etc.) +* **Level 2:** Level 1 information and configuration information including provider, executor, and launcher names. +* **Level 3:** Level 2 information and workflow execution details, including the number of applications run, failures, and execution time. + +**Example Messages:** + +- At launch: + + .. code-block:: json + + { + "correlator":"6bc7484e-5693-48b2-b6c0-5889a73f7f4e", + "parsl_v":"1.3.0-dev", + "python_v":"3.12.2", + "platform.system":"Darwin", + "tracking_level":3, + "components":[ + { + "c":"parsl.config.Config", + "executors_len":1, + "dependency_resolver":false + }, + "parsl.executors.threads.ThreadPoolExecutor" + ], + "start":1727156153 + } + +- On closure (Tracking Level 3 only): + + .. code-block:: json + + { + "correlator":"6bc7484e-5693-48b2-b6c0-5889a73f7f4e", + "execution_time":31, + "components":[ + { + "c":"parsl.dataflow.dflow.DataFlowKernel", + "app_count":3, + "app_fails":0 + }, + { + "c":"parsl.config.Config", + "executors_len":1, + "dependency_resolver":false + }, + "parsl.executors.threads.ThreadPoolExecutor" + ], + "end":1727156156 + } + +**All messages sent are logged in the** ``parsl.log`` **file, ensuring complete transparency.** How is the data sent? --------------------- -The data is sent via UDP. While this may cause us to lose some data, it drastically reduces the possibility -that the usage statistics reporting will adversely affect the operation of the software. +Data is sent using **UDP** to minimize the impact on workflow performance. While this may result in some data loss, it significantly reduces the chances of usage tracking affecting the software's operation. +The data is processed through AWS CloudWatch to generate a monitoring dashboard, providing valuable insights into usage patterns. When is the data sent? ---------------------- -The data is sent twice per run, once when Parsl starts a script, and once when the script is completed. +Data is sent twice per run: +1. At the start of the script. +2. Upon script completion (for Tracking Level 3). What will the data be used for? ------------------------------- -The data will be used for reporting purposes to answer questions such as: +The data will help the Parsl team understand Parsl usage and make development and maintenance decisions, including: + +* Focus development and maintenance on the most-used components of Parsl. +* Determine which Python versions to continue supporting. +* Track the age of Parsl installations. +* Assess how long it takes for most users to adopt new changes. +* Track usage statistics to report to funders. + +Usage Statistics Dashboard +-------------------------- -* How many unique users are using Parsl? -* To determine patterns of usage - is activity increasing or decreasing? +The collected data is aggregated and displayed on a publicly accessible dashboard. This dashboard provides an overview of how Parsl is being used across different environments and includes metrics such as: -We will also use this information to improve Parsl by identifying software faults. +* Total workflows executed over time +* Most-used Python and Parsl versions +* Most common platforms and executors and more -* What percentage of tasks complete successfully? -* Of the tasks that fail, what is the most common fault code returned? +`Find the dashboard here `_ +Leaderboard +----------- + +**Opting in to usage tracking also allows you to participate in the Parsl Leaderboard. +To participate in the leaderboard, you can deanonymize yourself using the** ``project_name`` **parameter in the parsl configuration object** (``parsl.config.Config``) **.** + +`Find the Parsl Leaderboard here `_ + +Example: + +.. code-block:: python3 + + config = Config( + executors=[ + HighThroughputExecutor( + ... + ) + ], + usage_tracking=3, + project_name="my-test-project" + ) + +Every run of parsl with usage tracking **Level 1** or **Level 2** earns you **1 point**. And every run with usage tracking **Level 3**, earns you **2 points**. + Feedback -------- -Please send us your feedback at parsl@googlegroups.com. Feedback from our user communities will be +Please send us your feedback at parsl@googlegroups.com. Feedback from our user communities will be useful in determining our path forward with usage tracking in the future. + +**Please consider turning on usage tracking to support the continued development of Parsl.** diff --git a/mypy.ini b/mypy.ini index 604fa4d07a..4b64a12de2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -137,12 +137,6 @@ ignore_missing_imports = True [mypy-copy_reg.*] ignore_missing_imports = True -[mypy-ipyparallel.*] -ignore_missing_imports = True - -[mypy-ipython_genutils.*] -ignore_missing_imports = True - [mypy-cmreslogging.handlers.*] ignore_missing_imports = True diff --git a/parsl/config.py b/parsl/config.py index c3725eccf8..1358e99d28 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -83,6 +83,9 @@ class Config(RepresentationMixin, UsageInformation): Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled. Parsl only collects minimal, non personally-identifiable, information used for reporting to our funding agencies. + project_name: str, optional + Option to deanonymize usage tracking data. + If set, this value will be used as the project name in the usage tracking data and placed on the leaderboard. initialize_logging : bool, optional Make DFK optionally not initialize any logging. Log messages will still be passed into the python logging system under the @@ -118,6 +121,7 @@ def __init__(self, max_idletime: float = 120.0, monitoring: Optional[MonitoringHub] = None, usage_tracking: int = 0, + project_name: Optional[str] = None, initialize_logging: bool = True) -> None: executors = tuple(executors or []) @@ -154,6 +158,7 @@ def __init__(self, self.max_idletime = max_idletime self.validate_usage_tracking(usage_tracking) self.usage_tracking = usage_tracking + self.project_name = project_name self.initialize_logging = initialize_logging self.monitoring = monitoring self.std_autopath: Optional[Callable] = std_autopath diff --git a/parsl/executors/errors.py b/parsl/executors/errors.py index 9725105dd9..fed43228be 100644 --- a/parsl/executors/errors.py +++ b/parsl/executors/errors.py @@ -1,4 +1,6 @@ """Exceptions raise by Executors.""" +from typing import Set + from parsl.errors import ParslError from parsl.executors.base import ParslExecutor @@ -44,6 +46,17 @@ def __str__(self): self.current_executor) +class InvalidResourceSpecification(ExecutorError): + """Error raised when Invalid input is supplied via resource Specification""" + + def __init__(self, invalid_keys: Set[str], message: str = ''): + self.invalid_keys = invalid_keys + self.message = message + + def __str__(self): + return f"Invalid Resource Specification Supplied: {self.invalid_keys}. {self.message}" + + class ScalingFailed(ExecutorError): """Scaling failed due to error in Execution provider.""" diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ffd1513dd4..7ab0e0a2ba 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -16,16 +16,17 @@ from parsl.addresses import get_all_addresses from parsl.app.errors import RemoteExceptionWrapper from parsl.data_provider.staging import Staging -from parsl.executors.errors import BadMessage, ScalingFailed +from parsl.executors.errors import ( + BadMessage, + InvalidResourceSpecification, + ScalingFailed, +) from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput.errors import CommandClientTimeoutError from parsl.executors.high_throughput.manager_selector import ( ManagerSelector, RandomManagerSelector, ) -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.process_loggers import wrap_with_logs @@ -145,6 +146,11 @@ encrypted : bool Flag to enable/disable encryption (CurveZMQ). Default is False. + + manager_selector: ManagerSelector + Determines what strategy the interchange uses to select managers during task distribution. + See API reference under "Manager Selectors" regarding the various manager selectors. + Default: 'RandomManagerSelector' """ # Documentation for params used by both HTEx and MPIEx @@ -343,15 +349,17 @@ def worker_logdir(self): return self.logdir def validate_resource_spec(self, resource_specification: dict): - """HTEX does not support *any* resource_specification options and - will raise InvalidResourceSpecification is any are passed to it""" + """HTEX supports the following *Optional* resource specifications: + priority: lower value is higher priority""" if resource_specification: - raise InvalidResourceSpecification( - set(resource_specification.keys()), - ("HTEX does not support the supplied resource_specifications." - "For MPI applications consider using the MPIExecutor. " - "For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ") - ) + acceptable_fields = {'priority'} + keys = set(resource_specification.keys()) + invalid_keys = keys - acceptable_fields + if invalid_keys: + message = "Task resource specification only accepts these types of resources: {}".format( + ', '.join(acceptable_fields)) + logger.error(message) + raise InvalidResourceSpecification(set(invalid_keys), message) return def initialize_scaling(self): @@ -464,9 +472,7 @@ def _result_queue_worker(self): except pickle.UnpicklingError: raise BadMessage("Message received could not be unpickled") - if msg['type'] == 'heartbeat': - continue - elif msg['type'] == 'result': + if msg['type'] == 'result': try: tid = msg['task_id'] except Exception: @@ -587,7 +593,7 @@ def hold_worker(self, worker_id: str) -> None: def outstanding(self) -> int: """Returns the count of tasks outstanding across the interchange and managers""" - return self.command_client.run("OUTSTANDING_C") + return len(self.tasks) @property def connected_workers(self) -> int: @@ -664,7 +670,7 @@ def submit(self, func, resource_specification, *args, **kwargs): except TypeError: raise SerializationError(func.__name__) - msg = {"task_id": task_id, "buffer": fn_buf} + msg = {"task_id": task_id, "resource_spec": resource_specification, "buffer": fn_buf} # Post task to the outgoing queue self.outgoing_q.put(msg) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index d61c76fed2..b0228b52f0 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -6,7 +6,6 @@ import pickle import platform import queue -import signal import sys import threading import time @@ -252,13 +251,7 @@ def _command_server(self) -> NoReturn: try: command_req = self.command_channel.recv_pyobj() logger.debug("Received command request: {}".format(command_req)) - if command_req == "OUTSTANDING_C": - outstanding = self.pending_task_queue.qsize() - for manager in self._ready_managers.values(): - outstanding += len(manager['tasks']) - reply = outstanding - - elif command_req == "CONNECTED_BLOCKS": + if command_req == "CONNECTED_BLOCKS": reply = self.connected_block_history elif command_req == "WORKERS": @@ -319,16 +312,6 @@ def start(self) -> None: """ Start the interchange """ - # If a user workflow has set its own signal handler for sigterm, that - # handler will be inherited by the interchange process because it is - # launched as a multiprocessing fork process. - # That can interfere with the interchange shutdown mechanism, which is - # to receive a SIGTERM and exit immediately. - # See Parsl issue #2343 (Threads and multiprocessing cannot be - # intermingled without deadlocks) which talks about other fork-related - # parent-process-inheritance problems. - signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Starting main interchange method") if self.hub_address is not None and self.hub_zmq_port is not None: @@ -549,7 +532,6 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug("Manager %r sent heartbeat via results connection", manager_id) - b_messages.append((p_message, r)) else: logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) diff --git a/parsl/executors/high_throughput/manager_selector.py b/parsl/executors/high_throughput/manager_selector.py index 0ede28ee7d..25a9c49ebc 100644 --- a/parsl/executors/high_throughput/manager_selector.py +++ b/parsl/executors/high_throughput/manager_selector.py @@ -19,7 +19,37 @@ def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list class RandomManagerSelector(ManagerSelector): + """Returns a shuffled list of interesting_managers + + By default this strategy is used by the interchange. Works well + in distributing workloads equally across all availble compute + resources. The random workload strategy is not effective in + conjunction with elastic scaling behavior as the even task + distribution does not allow the scaling down of blocks, leading + to wasted resource consumption. + """ + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: c_manager_list = list(manager_list) random.shuffle(c_manager_list) return c_manager_list + + +class BlockIdManagerSelector(ManagerSelector): + + """Returns an interesting_managers list sorted by block ID + + Observations: + 1. BlockID manager selector helps with workloads that see a varying + amount of tasks over time. New blocks are prioritized with the + blockID manager selector, when used with 'htex_auto_scaling', results + in compute cost savings. + + 2. Doesn't really work with bag-of-tasks workloads. When all the tasks + are put into the queue upfront, all blocks operate at near full + utilization for the majority of the workload, which task goes where + doesn't really matter. + """ + + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: + return sorted(manager_list, key=lambda x: (ready_managers[x]['block_id'] is not None, ready_managers[x]['block_id'])) diff --git a/parsl/executors/high_throughput/mpi_prefix_composer.py b/parsl/executors/high_throughput/mpi_prefix_composer.py index 0125d9a532..689fbb771f 100644 --- a/parsl/executors/high_throughput/mpi_prefix_composer.py +++ b/parsl/executors/high_throughput/mpi_prefix_composer.py @@ -1,5 +1,7 @@ import logging -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Tuple + +from parsl.executors.errors import InvalidResourceSpecification logger = logging.getLogger(__name__) @@ -8,27 +10,6 @@ 'mpiexec') -class MissingResourceSpecification(Exception): - """Exception raised when input is not supplied a resource specification""" - - def __init__(self, reason: str): - self.reason = reason - - def __str__(self): - return f"Missing resource specification: {self.reason}" - - -class InvalidResourceSpecification(Exception): - """Exception raised when Invalid input is supplied via resource specification""" - - def __init__(self, invalid_keys: Set[str], message: str = ''): - self.invalid_keys = invalid_keys - self.message = message - - def __str__(self): - return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}" - - def validate_resource_spec(resource_spec: Dict[str, str]): """Basic validation of keys in the resource_spec @@ -40,7 +21,8 @@ def validate_resource_spec(resource_spec: Dict[str, str]): # empty resource_spec when mpi_mode is set causes parsl to hang # ref issue #3427 if len(user_keys) == 0: - raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured') + raise InvalidResourceSpecification(user_keys, + 'MPI mode requires optional parsl_resource_specification keyword argument to be configured') legal_keys = set(("ranks_per_node", "num_nodes", diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 59efe501f1..7e238bb61c 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -362,7 +362,7 @@ def pull_tasks(self, kill_event): if tasks == HEARTBEAT_CODE: logger.debug("Got heartbeat from interchange") elif tasks == DRAINED_CODE: - logger.info("Got fulled drained message from interchange - setting kill flag") + logger.info("Got fully drained message from interchange - setting kill flag") kill_event.set() else: task_recv_counter += len(tasks) diff --git a/parsl/executors/threads.py b/parsl/executors/threads.py index cbc813d9d4..9b3b0df5ce 100644 --- a/parsl/executors/threads.py +++ b/parsl/executors/threads.py @@ -6,7 +6,7 @@ from parsl.data_provider.staging import Staging from parsl.executors.base import ParslExecutor -from parsl.executors.errors import UnsupportedFeatureError +from parsl.executors.errors import InvalidResourceSpecification from parsl.utils import RepresentationMixin logger = logging.getLogger(__name__) @@ -54,7 +54,8 @@ def submit(self, func, resource_specification, *args, **kwargs): if resource_specification: logger.error("Ignoring the resource specification. " "Parsl resource specification is not supported in ThreadPool Executor.") - raise UnsupportedFeatureError('resource specification', 'ThreadPool Executor', None) + raise InvalidResourceSpecification(set(resource_specification.keys()), + "Parsl resource specification is not supported in ThreadPool Executor.") return self.executor.submit(func, *args, **kwargs) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index ae39f8c118..155c990ab5 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -28,7 +28,7 @@ from parsl.data_provider.files import File from parsl.data_provider.staging import Staging from parsl.errors import OptionalModuleMissing -from parsl.executors.errors import ExecutorError +from parsl.executors.errors import ExecutorError, InvalidResourceSpecification from parsl.executors.status_handling import BlockProviderExecutor from parsl.executors.workqueue import exec_parsl_function from parsl.process_loggers import wrap_with_logs @@ -419,7 +419,7 @@ def submit(self, func, resource_specification, *args, **kwargs): message = "Task resource specification only accepts these types of resources: {}".format( ', '.join(acceptable_fields)) logger.error(message) - raise ExecutorError(self, message) + raise InvalidResourceSpecification(keys, message) # this checks that either all of the required resource types are specified, or # that none of them are: the `required_resource_types` are not actually required, @@ -430,9 +430,10 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.error("Running with `autolabel=False`. In this mode, " "task resource specification requires " "three resources to be specified simultaneously: cores, memory, and disk") - raise ExecutorError(self, "Task resource specification requires " - "three resources to be specified simultaneously: cores, memory, and disk. " - "Try setting autolabel=True if you are unsure of the resource usage") + raise InvalidResourceSpecification(keys, + "Task resource specification requires " + "three resources to be specified simultaneously: cores, memory, and disk. " + "Try setting autolabel=True if you are unsure of the resource usage") for k in keys: if k == 'cores': diff --git a/parsl/launchers/launchers.py b/parsl/launchers/launchers.py index 021357b56c..e4ac7ec717 100644 --- a/parsl/launchers/launchers.py +++ b/parsl/launchers/launchers.py @@ -61,7 +61,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. - fail_on_any: If True, return a nonzero exit code if any worker failed, otherwise zero; if False, return a nonzero exit code if all workers failed, otherwise zero. @@ -131,7 +130,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -208,7 +206,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -263,7 +260,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -311,7 +307,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block @@ -363,7 +358,6 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s """ Args: - command (string): The command string to be launched - - task_block (string) : bash evaluated string. """ task_blocks = tasks_per_node * nodes_per_block diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 40b5b430a5..8c3b081ad0 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -1,10 +1,5 @@ import logging -import time - -from parsl.providers.kubernetes.template import template_string - -logger = logging.getLogger(__name__) - +import uuid from typing import Any, Dict, List, Optional, Tuple import typeguard @@ -12,7 +7,8 @@ from parsl.errors import OptionalModuleMissing from parsl.jobs.states import JobState, JobStatus from parsl.providers.base import ExecutionProvider -from parsl.utils import RepresentationMixin +from parsl.providers.kubernetes.template import template_string +from parsl.utils import RepresentationMixin, sanitize_dns_subdomain_rfc1123 try: from kubernetes import client, config @@ -20,6 +16,8 @@ except (ImportError, NameError, FileNotFoundError): _kubernetes_enabled = False +logger = logging.getLogger(__name__) + translate_table = { 'Running': JobState.RUNNING, 'Pending': JobState.PENDING, @@ -161,7 +159,7 @@ def __init__(self, self.resources: Dict[object, Dict[str, Any]] self.resources = {} - def submit(self, cmd_string, tasks_per_node, job_name="parsl"): + def submit(self, cmd_string: str, tasks_per_node: int, job_name: str = "parsl.kube"): """ Submit a job Args: - cmd_string :(String) - Name of the container to initiate @@ -173,15 +171,19 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): Returns: - job_id: (string) Identifier for the job """ + job_id = uuid.uuid4().hex[:8] - cur_timestamp = str(time.time() * 1000).split(".")[0] - job_name = "{0}-{1}".format(job_name, cur_timestamp) - - if not self.pod_name: - pod_name = '{}'.format(job_name) - else: - pod_name = '{}-{}'.format(self.pod_name, - cur_timestamp) + pod_name = self.pod_name or job_name + try: + pod_name = sanitize_dns_subdomain_rfc1123(pod_name) + except ValueError: + logger.warning( + f"Invalid pod name '{pod_name}' for job '{job_id}', falling back to 'parsl.kube'" + ) + pod_name = "parsl.kube" + pod_name = pod_name[:253 - 1 - len(job_id)] # Leave room for the job ID + pod_name = pod_name.rstrip(".-") # Remove trailing dot or hyphen after trim + pod_name = f"{pod_name}.{job_id}" formatted_cmd = template_string.format(command=cmd_string, worker_init=self.worker_init) @@ -189,14 +191,14 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): logger.debug("Pod name: %s", pod_name) self._create_pod(image=self.image, pod_name=pod_name, - job_name=job_name, + job_id=job_id, cmd_string=formatted_cmd, volumes=self.persistent_volumes, service_account_name=self.service_account_name, annotations=self.annotations) - self.resources[pod_name] = {'status': JobStatus(JobState.RUNNING)} + self.resources[job_id] = {'status': JobStatus(JobState.RUNNING), 'pod_name': pod_name} - return pod_name + return job_id def status(self, job_ids): """ Get the status of a list of jobs identified by the job identifiers @@ -212,6 +214,9 @@ def status(self, job_ids): self._status() return [self.resources[jid]['status'] for jid in job_ids] + def _get_pod_name(self, job_id: str) -> str: + return self.resources[job_id]['pod_name'] + def cancel(self, job_ids): """ Cancels the jobs specified by a list of job ids Args: @@ -221,7 +226,8 @@ def cancel(self, job_ids): """ for job in job_ids: logger.debug("Terminating job/pod: {0}".format(job)) - self._delete_pod(job) + pod_name = self._get_pod_name(job) + self._delete_pod(pod_name) self.resources[job]['status'] = JobStatus(JobState.CANCELLED) rets = [True for i in job_ids] @@ -242,7 +248,8 @@ def _status(self): for jid in to_poll_job_ids: phase = None try: - pod = self.kube_client.read_namespaced_pod(name=jid, namespace=self.namespace) + pod_name = self._get_pod_name(jid) + pod = self.kube_client.read_namespaced_pod(name=pod_name, namespace=self.namespace) except Exception: logger.exception("Failed to poll pod {} status, most likely because pod was terminated".format(jid)) if self.resources[jid]['status'] is JobStatus(JobState.RUNNING): @@ -257,10 +264,10 @@ def _status(self): self.resources[jid]['status'] = JobStatus(status) def _create_pod(self, - image, - pod_name, - job_name, - port=80, + image: str, + pod_name: str, + job_id: str, + port: int = 80, cmd_string=None, volumes=[], service_account_name=None, @@ -269,7 +276,7 @@ def _create_pod(self, Args: - image (string) : Docker image to launch - pod_name (string) : Name of the pod - - job_name (string) : App label + - job_id (string) : Job ID KWargs: - port (integer) : Container port Returns: @@ -299,7 +306,7 @@ def _create_pod(self, ) # Configure Pod template container container = client.V1Container( - name=pod_name, + name=job_id, image=image, resources=resources, ports=[client.V1ContainerPort(container_port=port)], @@ -322,7 +329,7 @@ def _create_pod(self, claim_name=volume[0]))) metadata = client.V1ObjectMeta(name=pod_name, - labels={"app": job_name}, + labels={"parsl-job-id": job_id}, annotations=annotations) spec = client.V1PodSpec(containers=[container], image_pull_secrets=[secret], diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index 52124211bc..d84a07ad84 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -66,7 +66,8 @@ def fresh_config(): monitoring_debug=False, resource_monitoring_interval=1, ), - usage_tracking=True + usage_tracking=3, + project_name="parsl htex_local_alternate test configuration" ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 5cf9446906..b0036db499 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -59,7 +59,7 @@ def _chmod(path: pathlib.Path, mode: int): config = re.sub(r"[^A-z0-9_-]+", "_", pytestconfig.getoption('config')[0]) cwd = pathlib.Path(os.getcwd()) - pytest_dir = cwd / ".pytest" + pytest_dir = cwd / "pytest-parsl" pytest_dir.mkdir(mode=0o700, parents=True, exist_ok=True) test_dir_prefix = "parsltest-" diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 6184ac1265..df7c5a4bad 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -1,11 +1,9 @@ import parsl from parsl.app.app import python_app from parsl.executors import WorkQueueExecutor -from parsl.executors.errors import ExecutorError, UnsupportedFeatureError +from parsl.executors.errors import InvalidResourceSpecification from parsl.executors.high_throughput.executor import HighThroughputExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) +from parsl.executors.threads import ThreadPoolExecutor @python_app @@ -27,11 +25,10 @@ def test_resource(n=2): try: fut.result() except InvalidResourceSpecification: - assert isinstance(executor, HighThroughputExecutor) - except UnsupportedFeatureError: - assert not isinstance(executor, WorkQueueExecutor) - except Exception as e: - assert isinstance(e, ExecutorError) + assert ( + isinstance(executor, HighThroughputExecutor) or + isinstance(executor, WorkQueueExecutor) or + isinstance(executor, ThreadPoolExecutor)) # Specify resources with wrong types # 'cpus' is incorrect, should be 'cores' @@ -40,8 +37,7 @@ def test_resource(n=2): try: fut.result() except InvalidResourceSpecification: - assert isinstance(executor, HighThroughputExecutor) - except UnsupportedFeatureError: - assert not isinstance(executor, WorkQueueExecutor) - except Exception as e: - assert isinstance(e, ExecutorError) + assert ( + isinstance(executor, HighThroughputExecutor) or + isinstance(executor, WorkQueueExecutor) or + isinstance(executor, ThreadPoolExecutor)) diff --git a/parsl/tests/test_htex/test_block_manager_selector_unit.py b/parsl/tests/test_htex/test_block_manager_selector_unit.py new file mode 100644 index 0000000000..46b2bb1e5d --- /dev/null +++ b/parsl/tests/test_htex/test_block_manager_selector_unit.py @@ -0,0 +1,20 @@ +import pytest + +from parsl.executors.high_throughput.manager_record import ManagerRecord +from parsl.executors.high_throughput.manager_selector import BlockIdManagerSelector + + +@pytest.mark.local +def test_sort_managers(): + ready_managers = { + b'manager1': {'block_id': 1}, + b'manager2': {'block_id': None}, + b'manager3': {'block_id': 3}, + b'manager4': {'block_id': 2} + } + + manager_list = {b'manager1', b'manager2', b'manager3', b'manager4'} + expected_sorted_list = [b'manager2', b'manager1', b'manager4', b'manager3'] + manager_selector = BlockIdManagerSelector() + sorted_managers = manager_selector.sort_managers(ready_managers, manager_list) + assert sorted_managers == expected_sorted_list diff --git a/parsl/tests/test_htex/test_drain.py b/parsl/tests/test_htex/test_drain.py index da115b42f0..efd0405863 100644 --- a/parsl/tests/test_htex/test_drain.py +++ b/parsl/tests/test_htex/test_drain.py @@ -13,7 +13,9 @@ # based around the expected drain period: the drain period # is TIME_CONST seconds, and the single executed task will # last twice that many number of seconds. -TIME_CONST = 1 +TIME_CONST = 4 + +CONNECTED_MANAGERS_POLL_MS = 100 def local_config(): @@ -52,7 +54,7 @@ def test_drain(try_assert): # wait till we have a block running... - try_assert(lambda: len(htex.connected_managers()) == 1) + try_assert(lambda: len(htex.connected_managers()) == 1, check_period_ms=CONNECTED_MANAGERS_POLL_MS) managers = htex.connected_managers() assert managers[0]['active'], "The manager should be active" @@ -63,7 +65,7 @@ def test_drain(try_assert): time.sleep(TIME_CONST) # this assert should happen *very fast* after the above delay... - try_assert(lambda: htex.connected_managers()[0]['draining'], timeout_ms=500) + try_assert(lambda: htex.connected_managers()[0]['draining'], timeout_ms=500, check_period_ms=CONNECTED_MANAGERS_POLL_MS) # and the test task should still be running... assert not fut.done(), "The test task should still be running" @@ -76,4 +78,4 @@ def test_drain(try_assert): # connected managers. # As with the above draining assert, this should happen very fast after # the task ends. - try_assert(lambda: len(htex.connected_managers()) == 0, timeout_ms=500) + try_assert(lambda: len(htex.connected_managers()) == 0, timeout_ms=500, check_period_ms=CONNECTED_MANAGERS_POLL_MS) diff --git a/parsl/tests/test_htex/test_manager_selector_by_block.py b/parsl/tests/test_htex/test_manager_selector_by_block.py new file mode 100644 index 0000000000..0933b581ff --- /dev/null +++ b/parsl/tests/test_htex/test_manager_selector_by_block.py @@ -0,0 +1,53 @@ +import time + +import pytest + +import parsl +from parsl.app.app import bash_app, python_app +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.high_throughput.manager_selector import ( + BlockIdManagerSelector, + ManagerSelector, +) +from parsl.launchers import WrappedLauncher +from parsl.providers import LocalProvider +from parsl.usage_tracking.levels import LEVEL_1 + +BLOCK_COUNT = 2 + + +@parsl.python_app +def get_worker_pid(): + import os + return os.environ.get('PARSL_WORKER_BLOCK_ID') + + +@pytest.mark.local +def test_block_id_selection(try_assert): + htex = HighThroughputExecutor( + label="htex_local", + max_workers_per_node=1, + manager_selector=BlockIdManagerSelector(), + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=BLOCK_COUNT, + max_blocks=BLOCK_COUNT, + min_blocks=BLOCK_COUNT, + ), + ) + + config = Config( + executors=[htex], + usage_tracking=LEVEL_1, + ) + + with parsl.load(config): + blockids = [] + try_assert(lambda: len(htex.connected_managers()) == BLOCK_COUNT, timeout_ms=20000) + for i in range(10): + future = get_worker_pid() + blockids.append(future.result()) + + assert all(blockid == "1" for blockid in blockids) diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py index ac0c580c20..2a6d577416 100644 --- a/parsl/tests/test_htex/test_resource_spec_validation.py +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -4,9 +4,7 @@ import pytest from parsl.executors import HighThroughputExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) +from parsl.executors.errors import InvalidResourceSpecification def double(x): @@ -32,6 +30,13 @@ def test_resource_spec_validation(): assert ret_val is None +@pytest.mark.local +def test_resource_spec_validation_one_key(): + htex = HighThroughputExecutor() + ret_val = htex.validate_resource_spec({"priority": 2}) + assert ret_val is None + + @pytest.mark.local def test_resource_spec_validation_bad_keys(): htex = HighThroughputExecutor() diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py index aff2501674..63005a8df7 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py @@ -8,9 +8,7 @@ import parsl from parsl import Config, bash_app, python_app from parsl.executors import MPIExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - MissingResourceSpecification, -) +from parsl.executors.errors import InvalidResourceSpecification from parsl.launchers import SimpleLauncher from parsl.providers import LocalProvider @@ -185,6 +183,6 @@ def test_simulated_load(rounds: int = 100): @pytest.mark.local def test_missing_resource_spec(): - with pytest.raises(MissingResourceSpecification): + with pytest.raises(InvalidResourceSpecification): future = mock_app(sleep_dur=0.4) future.result(timeout=10) diff --git a/parsl/tests/test_mpi_apps/test_resource_spec.py b/parsl/tests/test_mpi_apps/test_resource_spec.py index f180c67d52..058a7ef425 100644 --- a/parsl/tests/test_mpi_apps/test_resource_spec.py +++ b/parsl/tests/test_mpi_apps/test_resource_spec.py @@ -10,12 +10,9 @@ import pytest from parsl.app.app import python_app +from parsl.executors.errors import InvalidResourceSpecification from parsl.executors.high_throughput.executor import HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, - MissingResourceSpecification, -) from parsl.executors.high_throughput.mpi_resource_management import ( get_nodes_in_batchjob, get_pbs_hosts_list, @@ -105,7 +102,7 @@ def test_top_level(): ({"num_nodes": 2, "ranks_per_node": 1}, None), ({"launcher_options": "--debug_foo"}, None), ({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification), - ({}, MissingResourceSpecification), + ({}, InvalidResourceSpecification), ) ) def test_mpi_resource_spec(resource_spec: Dict, exception): diff --git a/parsl/tests/test_providers/test_kubernetes_provider.py b/parsl/tests/test_providers/test_kubernetes_provider.py new file mode 100644 index 0000000000..453dc57422 --- /dev/null +++ b/parsl/tests/test_providers/test_kubernetes_provider.py @@ -0,0 +1,102 @@ +import re +from unittest import mock + +import pytest + +from parsl.providers.kubernetes.kube import KubernetesProvider +from parsl.tests.test_utils.test_sanitize_dns import DNS_SUBDOMAIN_REGEX + +_MOCK_BASE = "parsl.providers.kubernetes.kube" + + +@pytest.fixture(autouse=True) +def mock_kube_config(): + with mock.patch(f"{_MOCK_BASE}.config") as mock_config: + mock_config.load_kube_config.return_value = None + yield mock_config + + +@pytest.fixture +def mock_kube_client(): + mock_client = mock.MagicMock() + with mock.patch(f"{_MOCK_BASE}.client.CoreV1Api") as mock_api: + mock_api.return_value = mock_client + yield mock_client + + +@pytest.mark.local +def test_submit_happy_path(mock_kube_client: mock.MagicMock): + image = "test-image" + namespace = "test-namespace" + cmd_string = "test-command" + volumes = [("test-volume", "test-mount-path")] + service_account_name = "test-service-account" + annotations = {"test-annotation": "test-value"} + max_cpu = 2 + max_mem = "2Gi" + init_cpu = 1 + init_mem = "1Gi" + provider = KubernetesProvider( + image=image, + persistent_volumes=volumes, + namespace=namespace, + service_account_name=service_account_name, + annotations=annotations, + max_cpu=max_cpu, + max_mem=max_mem, + init_cpu=init_cpu, + init_mem=init_mem, + ) + + job_name = "test.job.name" + job_id = provider.submit(cmd_string=cmd_string, tasks_per_node=1, job_name=job_name) + + assert job_id in provider.resources + assert mock_kube_client.create_namespaced_pod.call_count == 1 + + call_args = mock_kube_client.create_namespaced_pod.call_args[1] + pod = call_args["body"] + container = pod.spec.containers[0] + volume = container.volume_mounts[0] + + assert image == container.image + assert namespace == call_args["namespace"] + assert any(cmd_string in arg for arg in container.args) + assert volumes[0] == (volume.name, volume.mount_path) + assert service_account_name == pod.spec.service_account_name + assert annotations == pod.metadata.annotations + assert str(max_cpu) == container.resources.limits["cpu"] + assert max_mem == container.resources.limits["memory"] + assert str(init_cpu) == container.resources.requests["cpu"] + assert init_mem == container.resources.requests["memory"] + assert job_id == pod.metadata.labels["parsl-job-id"] + assert job_id == container.name + assert f"{job_name}.{job_id}" == pod.metadata.name + + +@pytest.mark.local +@mock.patch(f"{_MOCK_BASE}.KubernetesProvider._create_pod") +@pytest.mark.parametrize("char", (".", "-")) +def test_submit_pod_name_includes_job_id(mock_create_pod: mock.MagicMock, char: str): + provider = KubernetesProvider(image="test-image") + + job_name = "a." * 121 + f"a{char}" + "a" * 9 + assert len(job_name) == 253 # Max length for pod name + job_id = provider.submit(cmd_string="test-command", tasks_per_node=1, job_name=job_name) + + expected_pod_name = job_name[:253 - len(job_id) - 2] + f".{job_id}" + actual_pod_name = mock_create_pod.call_args[1]["pod_name"] + assert re.match(DNS_SUBDOMAIN_REGEX, actual_pod_name) + assert expected_pod_name == actual_pod_name + + +@pytest.mark.local +@mock.patch(f"{_MOCK_BASE}.KubernetesProvider._create_pod") +@mock.patch(f"{_MOCK_BASE}.logger") +@pytest.mark.parametrize("job_name", ("", ".", "-", "a.-.a", "$$$")) +def test_submit_invalid_job_name(mock_logger: mock.MagicMock, mock_create_pod: mock.MagicMock, job_name: str): + provider = KubernetesProvider(image="test-image") + job_id = provider.submit(cmd_string="test-command", tasks_per_node=1, job_name=job_name) + assert mock_logger.warning.call_count == 1 + assert f"Invalid pod name '{job_name}' for job '{job_id}'" in mock_logger.warning.call_args[0][0] + assert f"parsl.kube.{job_id}" == mock_create_pod.call_args[1]["pod_name"] diff --git a/parsl/tests/test_utils/test_sanitize_dns.py b/parsl/tests/test_utils/test_sanitize_dns.py new file mode 100644 index 0000000000..17b801339c --- /dev/null +++ b/parsl/tests/test_utils/test_sanitize_dns.py @@ -0,0 +1,76 @@ +import random +import re + +import pytest + +from parsl.utils import sanitize_dns_label_rfc1123, sanitize_dns_subdomain_rfc1123 + +# Ref: https://datatracker.ietf.org/doc/html/rfc1123 +DNS_LABEL_REGEX = r'^[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?$' +DNS_SUBDOMAIN_REGEX = r'^[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?(\.[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?)*$' + +test_labels = [ + "example-label-123", # Valid label + "EXAMPLE", # Case sensitivity + "!@#example*", # Remove invalid characters + "--leading-and-trailing--", # Leading and trailing hyphens + "..leading.and.trailing..", # Leading and tailing dots + "multiple..dots", # Consecutive dots + "valid--label", # Consecutive hyphens + "a" * random.randint(64, 70), # Longer than 63 characters + f"{'a' * 62}-a", # Trailing hyphen at max length +] + + +def _generate_test_subdomains(num_subdomains: int): + subdomains = [] + for _ in range(num_subdomains): + num_labels = random.randint(1, 5) + labels = [test_labels[random.randint(0, num_labels - 1)] for _ in range(num_labels)] + subdomain = ".".join(labels) + subdomains.append(subdomain) + return subdomains + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", test_labels) +def test_sanitize_dns_label_rfc1123(raw_string: str): + print(sanitize_dns_label_rfc1123(raw_string)) + assert re.match(DNS_LABEL_REGEX, sanitize_dns_label_rfc1123(raw_string)) + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", ("", "-", "@", "$$$")) +def test_sanitize_dns_label_rfc1123_empty(raw_string: str): + with pytest.raises(ValueError) as e_info: + sanitize_dns_label_rfc1123(raw_string) + assert str(e_info.value) == f"Sanitized DNS label is empty for input '{raw_string}'" + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", _generate_test_subdomains(10)) +def test_sanitize_dns_subdomain_rfc1123(raw_string: str): + assert re.match(DNS_SUBDOMAIN_REGEX, sanitize_dns_subdomain_rfc1123(raw_string)) + + +@pytest.mark.local +@pytest.mark.parametrize("char", ("-", ".")) +def test_sanitize_dns_subdomain_rfc1123_trailing_non_alphanumeric_at_max_length(char: str): + raw_string = (f"{'a' * 61}." * 4) + f".aaaa{char}a" + assert re.match(DNS_SUBDOMAIN_REGEX, sanitize_dns_subdomain_rfc1123(raw_string)) + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", ("", ".", "...")) +def test_sanitize_dns_subdomain_rfc1123_empty(raw_string: str): + with pytest.raises(ValueError) as e_info: + sanitize_dns_subdomain_rfc1123(raw_string) + assert str(e_info.value) == f"Sanitized DNS subdomain is empty for input '{raw_string}'" + + +@pytest.mark.local +@pytest.mark.parametrize( + "raw_string", ("a" * 253, "a" * random.randint(254, 300)), ids=("254 chars", ">253 chars") +) +def test_sanitize_dns_subdomain_rfc1123_max_length(raw_string: str): + assert len(sanitize_dns_subdomain_rfc1123(raw_string)) <= 253 diff --git a/parsl/tests/unit/test_usage_tracking.py b/parsl/tests/unit/test_usage_tracking.py index 351355811c..1581249cd2 100644 --- a/parsl/tests/unit/test_usage_tracking.py +++ b/parsl/tests/unit/test_usage_tracking.py @@ -43,3 +43,24 @@ def test_invalid_types(level): # we can't instantiate TypeCheckError if we're in typeguard 2.x environment # because it does not exist... so check name using strings. assert ex.type.__name__ in ["TypeCheckError", "TypeError"] + + +@pytest.mark.local +def test_valid_project_name(): + """Test valid project_name.""" + assert ( + Config( + usage_tracking=3, + project_name="unit-test", + ).project_name == "unit-test" + ) + + +@pytest.mark.local +@pytest.mark.parametrize("name", (1, 1.0, True, object())) +def test_invalid_project_name(name): + """Test invalid project_name.""" + with pytest.raises(Exception) as ex: + Config(usage_tracking=3, project_name=name) + + assert ex.type.__name__ in ["TypeCheckError", "TypeError"] diff --git a/parsl/usage_tracking/usage.py b/parsl/usage_tracking/usage.py index 3730fcc464..c22eb529fe 100644 --- a/parsl/usage_tracking/usage.py +++ b/parsl/usage_tracking/usage.py @@ -114,6 +114,7 @@ def __init__(self, dfk, port=50077, sys.version_info.minor, sys.version_info.micro) self.tracking_level = self.check_tracking_level() + self.project_name = self.config.project_name self.start_time = None logger.debug("Tracking level: {}".format(self.tracking_level)) @@ -153,6 +154,9 @@ def construct_start_message(self) -> bytes: 'platform.system': platform.system(), 'tracking_level': int(self.tracking_level)} + if self.project_name: + message['project_name'] = self.project_name + if self.tracking_level >= 2: message['components'] = get_parsl_usage(self.dfk._config) @@ -188,6 +192,10 @@ def construct_end_message(self) -> bytes: 'end': end_time, 'execution_time': end_time - self.start_time, 'components': [dfk_component] + get_parsl_usage(self.dfk._config)} + + if self.project_name: + message['project_name'] = self.project_name + logger.debug(f"Usage tracking end message (unencoded): {message}") return self.encode_message(message) diff --git a/parsl/utils.py b/parsl/utils.py index 6f36d4506a..0ea5d7d9eb 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -1,6 +1,7 @@ import inspect import logging import os +import re import shlex import subprocess import threading @@ -380,3 +381,80 @@ def __exit__( exc_tb: Optional[TracebackType] ) -> None: self.cancel() + + +def sanitize_dns_label_rfc1123(raw_string: str) -> str: + """Convert input string to a valid RFC 1123 DNS label. + + Parameters + ---------- + raw_string : str + String to sanitize. + + Returns + ------- + str + Sanitized string. + + Raises + ------ + ValueError + If the string is empty after sanitization. + """ + # Convert to lowercase and replace non-alphanumeric characters with hyphen + sanitized = re.sub(r'[^a-z0-9]', '-', raw_string.lower()) + + # Remove consecutive hyphens + sanitized = re.sub(r'-+', '-', sanitized) + + # DNS label cannot exceed 63 characters + sanitized = sanitized[:63] + + # Strip after trimming to avoid trailing hyphens + sanitized = sanitized.strip("-") + + if not sanitized: + raise ValueError(f"Sanitized DNS label is empty for input '{raw_string}'") + + return sanitized + + +def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: + """Convert input string to a valid RFC 1123 DNS subdomain. + + Parameters + ---------- + raw_string : str + String to sanitize. + + Returns + ------- + str + Sanitized string. + + Raises + ------ + ValueError + If the string is empty after sanitization. + """ + segments = raw_string.split('.') + + sanitized_segments = [] + for segment in segments: + if not segment: + continue + sanitized_segment = sanitize_dns_label_rfc1123(segment) + sanitized_segments.append(sanitized_segment) + + sanitized = '.'.join(sanitized_segments) + + # DNS subdomain cannot exceed 253 characters + sanitized = sanitized[:253] + + # Strip after trimming to avoid trailing dots or hyphens + sanitized = sanitized.strip(".-") + + if not sanitized: + raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") + + return sanitized diff --git a/setup.py b/setup.py index 4934d01e5d..b381ecfc2b 100755 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ include_package_data=True, package_data={'parsl': ['py.typed']}, packages=find_packages(), - python_requires=">=3.8.0", + python_requires=">=3.9.0", install_requires=install_requires, scripts = ['parsl/executors/high_throughput/process_worker_pool.py', 'parsl/executors/high_throughput/interchange.py', @@ -71,7 +71,6 @@ # Licence, must match with licence above 'License :: OSI Approved :: Apache Software License', # Python versions supported - 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', diff --git a/test-requirements.txt b/test-requirements.txt index acd670b5e9..6abf727ccd 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,5 +1,4 @@ flake8==6.1.0 -ipyparallel pandas paramiko pytest>=7.4.0,<8