From adbe5a492d6c72a589661853b39818f17426c4e4 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 26 Aug 2024 13:57:05 -0500 Subject: [PATCH 01/15] Support for GlobusComputeExecutor to submit functions to globus_compute_endpoints --- parsl/executors/__init__.py | 4 +- parsl/executors/globus_compute.py | 145 ++++++++++++++++++++++++++++++ setup.py | 1 + 3 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 parsl/executors/globus_compute.py diff --git a/parsl/executors/__init__.py b/parsl/executors/__init__.py index bc29204502..81955aab76 100644 --- a/parsl/executors/__init__.py +++ b/parsl/executors/__init__.py @@ -1,4 +1,5 @@ from parsl.executors.flux.executor import FluxExecutor +from parsl.executors.globus_compute import GlobusComputeExecutor from parsl.executors.high_throughput.executor import HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.executors.threads import ThreadPoolExecutor @@ -8,4 +9,5 @@ 'HighThroughputExecutor', 'MPIExecutor', 'WorkQueueExecutor', - 'FluxExecutor'] + 'FluxExecutor', + 'GlobusComputeExecutor'] diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py new file mode 100644 index 0000000000..0f76314deb --- /dev/null +++ b/parsl/executors/globus_compute.py @@ -0,0 +1,145 @@ +import uuid +from concurrent.futures import Future +from typing import Any, Callable, Dict, Optional, Union + +import typeguard + +from parsl.errors import OptionalModuleMissing +from parsl.executors.base import ParslExecutor +from parsl.utils import RepresentationMixin + +UUID_LIKE_T = Union[uuid.UUID, str] + + + +class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): + """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints + + GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor + Refer to `globus-compute user documentation `_ + and `reference documentation `_ + for more details. + """ + + def __init__( + self, + endpoint_id: Optional[UUID_LIKE_T] = None, + task_group_id: Optional[UUID_LIKE_T] = None, + resource_specification: Optional[dict[str, Any]] = None, + user_endpoint_config: Optional[dict[str, Any]] = None, + label: str = "GlobusComputeExecutor", + batch_size: int = 128, + amqp_port: Optional[int] = None, + **kwargs, + ): + """ + + Parameters + ---------- + + endpoint_id: + id of the endpoint to which to submit tasks + + task_group_id: + The Task Group to which to associate tasks. If not set, + one will be instantiated. + + resource_specification: + Specify resource requirements for individual task execution. + + user_endpoint_config: + User endpoint configuration values as described + and allowed by endpoint administrators. Must be a JSON-serializable dict + or None. + + label: + a label to name the executor; mainly utilized for + logging and advanced needs with multiple executors. + + batch_size: + the maximum number of tasks to coalesce before + sending upstream [min: 1, default: 128] + + amqp_port: + Port to use when connecting to results queue. Note that the + Compute web services only support 5671, 5672, and 443. + + kwargs: + Other kwargs listed will be passed through to globus_compute_sdk.Executor + as is + """ + super().__init__() + self.endpoint_id = endpoint_id + self.task_group_id = task_group_id + self.resource_specification = resource_specification + self.user_endpoint_config = user_endpoint_config + self.label = label + self.batch_size = batch_size + self.amqp_port = amqp_port + + try: + from globus_compute_sdk import Executor + except ImportError: + raise OptionalModuleMissing( + ['globus-compute-sdk'], + "GlobusComputeExecutor requires globus-compute-sdk installed" + ) + self._executor: Executor = Executor( + endpoint_id=endpoint_id, + task_group_id=task_group_id, + resource_specification=resource_specification, + user_endpoint_config=user_endpoint_config, + label=label, + batch_size=batch_size, + amqp_port=amqp_port, + **kwargs + ) + + def start(self) -> None: + """Empty function + """ + pass + + def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: + """ Submit fn to globus-compute + + + Parameters + ---------- + + func: Callable + Python function to execute remotely + resource_specification: Dict[str, Any] + Resource specification used to run MPI applications on Endpoints configured + to use globus compute's MPIEngine + args: + Args to pass to the function + kwargs: + kwargs to pass to the function + + Returns + ------- + + Future + """ + self._executor.resource_specification = resource_specification or self.resource_specification + return self._executor.submit(func, *args, **kwargs) + + def shutdown(self, wait=True, *, cancel_futures=False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other methods + can be called after this one. + + Parameters + ---------- + + wait: If True, then this method will not return until all pending + futures have received results. + cancel_futures: If True, then this method will cancel all futures + that have not yet registered their tasks with the Compute web services. + Tasks cannot be cancelled once they are registered. + """ + return self._executor.shutdown() + + diff --git a/setup.py b/setup.py index cace8c0252..8336e4db7b 100755 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ 'flux': ['pyyaml', 'cffi', 'jsonschema'], 'proxystore': ['proxystore'], 'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'], + 'globus_compute': ['globus_compute_sdk>=2.27.1'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } From c63202f25a2d252091d9e015f4923dcef29c19a6 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 26 Aug 2024 15:25:27 -0500 Subject: [PATCH 02/15] Github Action for GlobusComputeExecutor (#3619) * Support for testing GlobusComputeExecutor in a github action * Adding shared_fs and staging_required tags to tests * Adding GlobusComputeExecutor test config --- .github/workflows/gce_test.yaml | 103 ++++++++++++++++++ Makefile | 4 + docs/reference.rst | 1 + parsl/executors/globus_compute.py | 12 +- parsl/tests/configs/globus_compute.py | 18 +++ parsl/tests/conftest.py | 4 + .../test_error_handling/test_resource_spec.py | 3 + test-requirements.txt | 1 + 8 files changed, 137 insertions(+), 9 deletions(-) create mode 100644 .github/workflows/gce_test.yaml create mode 100644 parsl/tests/configs/globus_compute.py diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml new file mode 100644 index 0000000000..01fb5f80c7 --- /dev/null +++ b/.github/workflows/gce_test.yaml @@ -0,0 +1,103 @@ +name: GlobusComputeExecutor tests + +on: + pull_request: + types: + - opened + - synchronize + +jobs: + main-test-suite: + strategy: + matrix: + python-version: ["3.11"] + runs-on: ubuntu-20.04 + timeout-minutes: 60 + + steps: + - uses: actions/checkout@master + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Collect Job Information + id: job-info + run: | + echo "Python Version: ${{ matrix.python-version }}" >> ci_job_info.txt + echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt + echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt + echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt + echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt + echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt + as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")" + echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT + + - name: Non-requirements based install + run: | + # libpython3.5: make workqueue binary installer happy + # mpich: required by radical executor + sudo apt-get update -q + sudo apt-get install -qy libpython3.5 mpich + + - name: setup virtual env + run: | + make virtualenv + source .venv/bin/activate + + - name: make deps clean_coverage + run: | + source .venv/bin/activate + make deps + make clean_coverage + + # Temporary fix, until changes make it into compute releases + git clone -b configure_tasks_working_dir https://github.com/globus/globus-compute.git + pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint + + - name: start globus_compute_endpoint + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source /home/runner/work/parsl/parsl/.venv/bin/activate + globus-compute-endpoint configure default + which globus-compute-endpoint + python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)" + python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)" + cat << EOF > /home/runner/.globus_compute/default/config.yaml + engine: + type: ThreadPoolEngine + max_workers: 4 + working_dir: /home/runner/.globus_compute/default/tasks_working_dir + EOF + cat /home/runner/.globus_compute/default/config.yaml + mkdir ~/.globus_compute/default/tasks_working_dir + globus-compute-endpoint start default + globus-compute-endpoint list + - name: make test + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source .venv/bin/activate + export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38) + echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT" + + # temporary; until test-matrixification + export PARSL_TEST_PRESERVE_NUM_RUNS=7 + + make gce_test + ln -s .pytest/parsltest-current test_runinfo + + - name: Archive runinfo logs + if: ${{ always() }} + uses: actions/upload-artifact@v4 + with: + name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} + path: | + runinfo/ + .pytest/ + ci_job_info.txt + compression-level: 9 diff --git a/Makefile b/Makefile index 4d2f37f715..ad127f2c23 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,10 @@ clean_coverage: mypy: ## run mypy checks MYPYPATH=$(CWD)/mypy-stubs mypy parsl/ +.PHONY: gce_test +gce_test: ## Run tests with GlobusComputeExecutor + pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10 + .PHONY: local_thread_test local_thread_test: ## run all tests with local_thread config pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10 diff --git a/docs/reference.rst b/docs/reference.rst index 45f83ad36f..5933f7841b 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -77,6 +77,7 @@ Executors parsl.executors.taskvine.TaskVineExecutor parsl.executors.FluxExecutor parsl.executors.radical.RadicalPilotExecutor + parsl.executors.globus_compute.GlobusComputeExecutor Manager Selectors ================= diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 0f76314deb..29a8d4be41 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -2,8 +2,6 @@ from concurrent.futures import Future from typing import Any, Callable, Dict, Optional, Union -import typeguard - from parsl.errors import OptionalModuleMissing from parsl.executors.base import ParslExecutor from parsl.utils import RepresentationMixin @@ -11,7 +9,6 @@ UUID_LIKE_T = Union[uuid.UUID, str] - class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints @@ -25,15 +22,14 @@ def __init__( self, endpoint_id: Optional[UUID_LIKE_T] = None, task_group_id: Optional[UUID_LIKE_T] = None, - resource_specification: Optional[dict[str, Any]] = None, - user_endpoint_config: Optional[dict[str, Any]] = None, + resource_specification: Optional[Dict[str, Any]] = None, + user_endpoint_config: Optional[Dict[str, Any]] = None, label: str = "GlobusComputeExecutor", batch_size: int = 128, amqp_port: Optional[int] = None, **kwargs, - ): + ): """ - Parameters ---------- @@ -141,5 +137,3 @@ def shutdown(self, wait=True, *, cancel_futures=False): Tasks cannot be cancelled once they are registered. """ return self._executor.shutdown() - - diff --git a/parsl/tests/configs/globus_compute.py b/parsl/tests/configs/globus_compute.py new file mode 100644 index 0000000000..edb45801e0 --- /dev/null +++ b/parsl/tests/configs/globus_compute.py @@ -0,0 +1,18 @@ +import os + +from parsl.config import Config +from parsl.executors import GlobusComputeExecutor + + +def fresh_config(): + + endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"] + + return Config( + executors=[ + GlobusComputeExecutor( + label="globus_compute", + endpoint_id=endpoint_id + ) + ] + ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 4bcdde0b7a..4f1281025c 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -163,6 +163,10 @@ def pytest_configure(config): 'markers', 'shared_fs: Marks tests that require a shared_fs between the workers are the test client' ) + config.addinivalue_line( + 'markers', + 'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)' + ) @pytest.fixture(autouse=True, scope='session') diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 4616219be2..7def2b736c 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -1,3 +1,5 @@ +import pytest + import parsl from parsl.app.app import python_app from parsl.executors import WorkQueueExecutor @@ -11,6 +13,7 @@ def double(x, parsl_resource_specification={}): return x * 2 +@pytest.mark.issue_3620 def test_resource(n=2): executors = parsl.dfk().executors executor = None diff --git a/test-requirements.txt b/test-requirements.txt index 82ec5172c2..5016c5c48d 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -10,6 +10,7 @@ types-mock types-python-dateutil types-requests mpi4py +globus-compute-sdk>=2.27.1 # sqlalchemy is needed for typechecking, so it's here # as well as at runtime for optional monitoring execution From be4a66d6c967d5cbfffeb8f38df4483e2945cf91 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Wed, 23 Oct 2024 13:25:42 -0500 Subject: [PATCH 03/15] Add Client as a kwarg for GlobusComputeExecutor. --- parsl/executors/globus_compute.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 29a8d4be41..cac59be20c 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -6,6 +6,13 @@ from parsl.executors.base import ParslExecutor from parsl.utils import RepresentationMixin +try: + from globus_compute_sdk import Client, Executor + _globus_compute_enabled = True +except ImportError: + _globus_compute_enabled = False + Client: Any # type: ignore[no-redef] + UUID_LIKE_T = Union[uuid.UUID, str] @@ -27,6 +34,7 @@ def __init__( label: str = "GlobusComputeExecutor", batch_size: int = 128, amqp_port: Optional[int] = None, + client: Optional[Client] = None, **kwargs, ): """ @@ -60,6 +68,10 @@ def __init__( Port to use when connecting to results queue. Note that the Compute web services only support 5671, 5672, and 443. + client: + instance of globus_compute_sdk.Client to be used by the executor. + If not provided, the executor will instantiate one with default arguments. + kwargs: Other kwargs listed will be passed through to globus_compute_sdk.Executor as is @@ -72,14 +84,14 @@ def __init__( self.label = label self.batch_size = batch_size self.amqp_port = amqp_port + self.client = client - try: - from globus_compute_sdk import Executor - except ImportError: + if not _globus_compute_enabled: raise OptionalModuleMissing( ['globus-compute-sdk'], "GlobusComputeExecutor requires globus-compute-sdk installed" ) + self._executor: Executor = Executor( endpoint_id=endpoint_id, task_group_id=task_group_id, @@ -88,6 +100,7 @@ def __init__( label=label, batch_size=batch_size, amqp_port=amqp_port, + client=self.client, **kwargs ) From 709adfea3d31e39486e20bb77e8dcb591a028049 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 24 Oct 2024 15:11:44 +0000 Subject: [PATCH 04/15] GCE now accepts user_endpoint_config via resource specification --- mypy.ini | 3 +++ parsl/executors/globus_compute.py | 18 ++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/mypy.ini b/mypy.ini index 4b64a12de2..e46e11fd63 100644 --- a/mypy.ini +++ b/mypy.ini @@ -177,6 +177,9 @@ ignore_missing_imports = True #[mypy-multiprocessing.synchronization.*] #ignore_missing_imports = True +[mypy-globus_compute_sdk.*] +ignore_missing_imports = True + [mypy-pandas.*] ignore_missing_imports = True diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index cac59be20c..ac484beee4 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import uuid from concurrent.futures import Future from typing import Any, Callable, Dict, Optional, Union @@ -11,7 +13,6 @@ _globus_compute_enabled = True except ImportError: _globus_compute_enabled = False - Client: Any # type: ignore[no-redef] UUID_LIKE_T = Union[uuid.UUID, str] @@ -54,7 +55,9 @@ def __init__( user_endpoint_config: User endpoint configuration values as described and allowed by endpoint administrators. Must be a JSON-serializable dict - or None. + or None. Refer docs from `globus-compute + `_ + for more info. label: a label to name the executor; mainly utilized for @@ -118,11 +121,16 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: func: Callable Python function to execute remotely + resource_specification: Dict[str, Any] - Resource specification used to run MPI applications on Endpoints configured - to use globus compute's MPIEngine + Resource specification can be used specify MPI resources required by MPI applications on + Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config* + to configure endpoints when the endpoint is a `Multi-User Endpoint + `_ + args: Args to pass to the function + kwargs: kwargs to pass to the function @@ -132,6 +140,8 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Future """ self._executor.resource_specification = resource_specification or self.resource_specification + # Pop user_endpoint_config since it is illegal in resource_spec for globus_compute + self._executor.user_endpoint_config = resource_specification.pop('user_endpoint_config', self.user_endpoint_config) return self._executor.submit(func, *args, **kwargs) def shutdown(self, wait=True, *, cancel_futures=False): From 9391e307af725da93c700df209f6693b5fff0d99 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Thu, 24 Oct 2024 14:25:34 -0500 Subject: [PATCH 05/15] Adding GCE to executor docs --- docs/userguide/execution.rst | 3 +++ parsl/executors/globus_compute.py | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 832985c164..8b190fa7b2 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -87,6 +87,9 @@ Parsl currently supports the following executors: 4. `parsl.executors.taskvine.TaskVineExecutor`: This executor uses `TaskVine `_ as the execution backend. TaskVine scales up to tens of thousands of cores and actively uses local storage on compute nodes to offer a diverse array of performance-oriented features, including: smart caching and sharing common large files between tasks and compute nodes, reliable execution of tasks, dynamic resource sizing, automatic Python environment detection and sharing. These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors. +5. `parsl.executors.globus_compute.GlobusComputeExecutor`: This executor uses `Globus Compute `_ +as the execution backend to run functions on remote systems. + .. note:: Refer to :ref:`configuration-section` for information on how to configure these executors. diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index ac484beee4..685ee285e6 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -24,6 +24,15 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): Refer to `globus-compute user documentation `_ and `reference documentation `_ for more details. + + .. note:: + As a remote execution system, Globus Compute relies on serialization to ship + tasks and results between the Parsl client side and the remote Globus Compute + Endpoint side. Serialization is unreliable across python versions, and + wrappers used by Parsl assume identical Parsl versions across on both sides. + We recommend using matching Python, Parsl and Globus Compute version on both + the client side and the endpoint side for stable behavior. + """ def __init__( From fd8362103698ad12a142bafbf5569f06ecb08309 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Wed, 30 Oct 2024 13:29:08 -0500 Subject: [PATCH 06/15] Fixes to avoid res_spec modification * GCE now makes a deepcopy of the resource_specification to avoid modifying the user supplied object. * stack of docstring changes --- docs/userguide/execution.rst | 2 +- parsl/executors/globus_compute.py | 35 +++++++++++++------------------ 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 8b190fa7b2..848a3114e4 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -88,7 +88,7 @@ Parsl currently supports the following executors: These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors. 5. `parsl.executors.globus_compute.GlobusComputeExecutor`: This executor uses `Globus Compute `_ -as the execution backend to run functions on remote systems. +as the execution backend to run tasks on remote systems. .. note:: Refer to :ref:`configuration-section` for information on how to configure these executors. diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 685ee285e6..55d0b81287 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -1,5 +1,6 @@ from __future__ import annotations +import copy import uuid from concurrent.futures import Future from typing import Any, Callable, Dict, Optional, Union @@ -69,8 +70,7 @@ def __init__( for more info. label: - a label to name the executor; mainly utilized for - logging and advanced needs with multiple executors. + a label to name the executor batch_size: the maximum number of tasks to coalesce before @@ -117,12 +117,10 @@ def __init__( ) def start(self) -> None: - """Empty function - """ pass def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: - """ Submit fn to globus-compute + """ Submit func to globus-compute Parameters @@ -148,24 +146,21 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Future """ - self._executor.resource_specification = resource_specification or self.resource_specification + res_spec = copy.deepcopy(resource_specification or self.resource_specification) # Pop user_endpoint_config since it is illegal in resource_spec for globus_compute - self._executor.user_endpoint_config = resource_specification.pop('user_endpoint_config', self.user_endpoint_config) + if res_spec: + user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config) + else: + user_endpoint_config = self.user_endpoint_config + + self._executor.resource_specification = res_spec + self._executor.user_endpoint_config = user_endpoint_config return self._executor.submit(func, *args, **kwargs) - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self): """Clean-up the resources associated with the Executor. - It is safe to call this method several times. Otherwise, no other methods - can be called after this one. - - Parameters - ---------- - - wait: If True, then this method will not return until all pending - futures have received results. - cancel_futures: If True, then this method will cancel all futures - that have not yet registered their tasks with the Compute web services. - Tasks cannot be cancelled once they are registered. + GCE.shutdown will cancel all futures that have not yet registered with + Globus Compute and will not wait for the launched futures to complete. """ - return self._executor.shutdown() + return self._executor.shutdown(wait=False, cancel_futures=True) From 203e9bd10e058b813dedab1422e098a62611bd04 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 11 Nov 2024 06:56:11 -0600 Subject: [PATCH 07/15] Remove redundant non-requirements installs --- .github/workflows/gce_test.yaml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml index 01fb5f80c7..bc21e1714a 100644 --- a/.github/workflows/gce_test.yaml +++ b/.github/workflows/gce_test.yaml @@ -34,13 +34,6 @@ jobs: as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")" echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT - - name: Non-requirements based install - run: | - # libpython3.5: make workqueue binary installer happy - # mpich: required by radical executor - sudo apt-get update -q - sudo apt-get install -qy libpython3.5 mpich - - name: setup virtual env run: | make virtualenv From 50360dab55cc4c9ac00661ced2fa38a55e3b1d25 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 11 Nov 2024 07:13:50 -0600 Subject: [PATCH 08/15] Endpoint_id is required Adding doc link --- parsl/executors/globus_compute.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 55d0b81287..a6fd8221d0 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -38,7 +38,7 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): def __init__( self, - endpoint_id: Optional[UUID_LIKE_T] = None, + endpoint_id: UUID_LIKE_T, task_group_id: Optional[UUID_LIKE_T] = None, resource_specification: Optional[Dict[str, Any]] = None, user_endpoint_config: Optional[Dict[str, Any]] = None, @@ -86,7 +86,8 @@ def __init__( kwargs: Other kwargs listed will be passed through to globus_compute_sdk.Executor - as is + as is. Refer to `globus-compute docs + `_ """ super().__init__() self.endpoint_id = endpoint_id From d7a6eed563f112b0314b129595e62a1554e85897 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 11 Nov 2024 07:16:54 -0600 Subject: [PATCH 09/15] Remove return from shutdown --- parsl/executors/globus_compute.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index a6fd8221d0..a7920783a2 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -98,6 +98,7 @@ def __init__( self.batch_size = batch_size self.amqp_port = amqp_port self.client = client + self.executor_kwargs = kwargs if not _globus_compute_enabled: raise OptionalModuleMissing( @@ -105,21 +106,21 @@ def __init__( "GlobusComputeExecutor requires globus-compute-sdk installed" ) + def start(self) -> None: + """ Start the Globus Compute Executor """ + self._executor: Executor = Executor( - endpoint_id=endpoint_id, - task_group_id=task_group_id, - resource_specification=resource_specification, - user_endpoint_config=user_endpoint_config, - label=label, - batch_size=batch_size, - amqp_port=amqp_port, + endpoint_id=self.endpoint_id, + task_group_id=self.task_group_id, + resource_specification=self.resource_specification, + user_endpoint_config=self.user_endpoint_config, + label=self.label, + batch_size=self.batch_size, + amqp_port=self.amqp_port, client=self.client, - **kwargs + **self.executor_kwargs ) - def start(self) -> None: - pass - def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: """ Submit func to globus-compute @@ -164,4 +165,4 @@ def shutdown(self): GCE.shutdown will cancel all futures that have not yet registered with Globus Compute and will not wait for the launched futures to complete. """ - return self._executor.shutdown(wait=False, cancel_futures=True) + self._executor.shutdown(wait=False, cancel_futures=True) From 53f97fd3b19de40e06fefaa7e3fe19cab50a921c Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Wed, 13 Nov 2024 18:38:00 -0600 Subject: [PATCH 10/15] * Update GC Endpoint install to use GC main branch * Removed python version test matrix * Removed obsolete install of libpython5 * Renamed `.pytest` to `pytest-parsl` --- .github/workflows/gce_test.yaml | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml index bc21e1714a..4ec835ece1 100644 --- a/.github/workflows/gce_test.yaml +++ b/.github/workflows/gce_test.yaml @@ -8,24 +8,21 @@ on: jobs: main-test-suite: - strategy: - matrix: - python-version: ["3.11"] runs-on: ubuntu-20.04 timeout-minutes: 60 steps: - uses: actions/checkout@master - - name: Set up Python ${{ matrix.python-version }} + - name: Set up Python 3.11 uses: actions/setup-python@v4 with: - python-version: ${{ matrix.python-version }} + python-version: 3.11 - name: Collect Job Information id: job-info run: | - echo "Python Version: ${{ matrix.python-version }}" >> ci_job_info.txt + echo "Python Version: 3.11" >> ci_job_info.txt echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt @@ -39,14 +36,20 @@ jobs: make virtualenv source .venv/bin/activate + - name: Non-requirements based install + run: | + # mpich: required by mpi4py which is in test-requirements for radical-pilot + sudo apt-get update -q + sudo apt-get install -qy mpich + - name: make deps clean_coverage run: | source .venv/bin/activate make deps make clean_coverage - - # Temporary fix, until changes make it into compute releases - git clone -b configure_tasks_working_dir https://github.com/globus/globus-compute.git + + # Temporary fix until fixes make it to a release + git clone -b main https://github.com/globus/globus-compute.git pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint - name: start globus_compute_endpoint @@ -63,7 +66,6 @@ jobs: engine: type: ThreadPoolEngine max_workers: 4 - working_dir: /home/runner/.globus_compute/default/tasks_working_dir EOF cat /home/runner/.globus_compute/default/config.yaml mkdir ~/.globus_compute/default/tasks_working_dir @@ -75,22 +77,22 @@ jobs: GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} run: | source .venv/bin/activate - export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38) + export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38) echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT" # temporary; until test-matrixification export PARSL_TEST_PRESERVE_NUM_RUNS=7 make gce_test - ln -s .pytest/parsltest-current test_runinfo + ln -s pytest-parsl/parsltest-current test_runinfo - name: Archive runinfo logs if: ${{ always() }} uses: actions/upload-artifact@v4 with: - name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} + name: runinfo-3.11-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} path: | runinfo/ - .pytest/ + pytest-parsl/ ci_job_info.txt - compression-level: 9 + compression-level: 9 \ No newline at end of file From 13c3b8c30c4cf136d605f4dad50aa548aada09f3 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Fri, 15 Nov 2024 13:22:18 -0600 Subject: [PATCH 11/15] Minor fix --- .github/workflows/gce_test.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml index 4ec835ece1..970bff6b17 100644 --- a/.github/workflows/gce_test.yaml +++ b/.github/workflows/gce_test.yaml @@ -80,7 +80,6 @@ jobs: export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38) echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT" - # temporary; until test-matrixification export PARSL_TEST_PRESERVE_NUM_RUNS=7 make gce_test From 6cd4b540b2128e779015278199cdc6a1dee44f58 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Fri, 15 Nov 2024 13:24:44 -0600 Subject: [PATCH 12/15] Add typechecking to GlobusComputeExecutor init. --- parsl/executors/globus_compute.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index a7920783a2..23c003f8f7 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -5,6 +5,8 @@ from concurrent.futures import Future from typing import Any, Callable, Dict, Optional, Union +import typeguard + from parsl.errors import OptionalModuleMissing from parsl.executors.base import ParslExecutor from parsl.utils import RepresentationMixin @@ -36,6 +38,7 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): """ + @typeguard.typechecked def __init__( self, endpoint_id: UUID_LIKE_T, From 9cbc49182135ca58493622c00ab5160c363ec138 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 18 Nov 2024 14:38:17 -0600 Subject: [PATCH 13/15] Update GlobusComputeExecutor to accept the Executor directly rather than instantiate it --- parsl/executors/globus_compute.py | 75 +++++++-------------------- parsl/tests/configs/globus_compute.py | 4 +- 2 files changed, 21 insertions(+), 58 deletions(-) diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 23c003f8f7..0cbba7b195 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -12,7 +12,7 @@ from parsl.utils import RepresentationMixin try: - from globus_compute_sdk import Client, Executor + from globus_compute_sdk import Executor _globus_compute_enabled = True except ImportError: _globus_compute_enabled = False @@ -41,26 +41,22 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): @typeguard.typechecked def __init__( self, - endpoint_id: UUID_LIKE_T, - task_group_id: Optional[UUID_LIKE_T] = None, - resource_specification: Optional[Dict[str, Any]] = None, - user_endpoint_config: Optional[Dict[str, Any]] = None, - label: str = "GlobusComputeExecutor", - batch_size: int = 128, - amqp_port: Optional[int] = None, - client: Optional[Client] = None, - **kwargs, + executor: Executor, + label: str = 'GlobusComputeExecutor', + resource_specification: Optional[dict] = None, + user_endpoint_config: Optional[dict] = None, ): """ Parameters ---------- - endpoint_id: - id of the endpoint to which to submit tasks + executor: globus_compute_sdk.Executor + Pass a globus_compute_sdk Executor that will be used to execute + tasks on a globus_compute endpoint. Refer to `globus-compute docs + `_ - task_group_id: - The Task Group to which to associate tasks. If not set, - one will be instantiated. + label: + a label to name the executor resource_specification: Specify resource requirements for individual task execution. @@ -72,57 +68,22 @@ def __init__( `_ for more info. - label: - a label to name the executor - - batch_size: - the maximum number of tasks to coalesce before - sending upstream [min: 1, default: 128] - - amqp_port: - Port to use when connecting to results queue. Note that the - Compute web services only support 5671, 5672, and 443. - - client: - instance of globus_compute_sdk.Client to be used by the executor. - If not provided, the executor will instantiate one with default arguments. - - kwargs: - Other kwargs listed will be passed through to globus_compute_sdk.Executor - as is. Refer to `globus-compute docs - `_ """ - super().__init__() - self.endpoint_id = endpoint_id - self.task_group_id = task_group_id - self.resource_specification = resource_specification - self.user_endpoint_config = user_endpoint_config - self.label = label - self.batch_size = batch_size - self.amqp_port = amqp_port - self.client = client - self.executor_kwargs = kwargs - if not _globus_compute_enabled: raise OptionalModuleMissing( ['globus-compute-sdk'], "GlobusComputeExecutor requires globus-compute-sdk installed" ) + super().__init__() + self._executor: Executor = executor + self.resource_specification = resource_specification + self.user_endpoint_config = user_endpoint_config + self.label = label + def start(self) -> None: """ Start the Globus Compute Executor """ - - self._executor: Executor = Executor( - endpoint_id=self.endpoint_id, - task_group_id=self.task_group_id, - resource_specification=self.resource_specification, - user_endpoint_config=self.user_endpoint_config, - label=self.label, - batch_size=self.batch_size, - amqp_port=self.amqp_port, - client=self.client, - **self.executor_kwargs - ) + pass def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: """ Submit func to globus-compute diff --git a/parsl/tests/configs/globus_compute.py b/parsl/tests/configs/globus_compute.py index edb45801e0..19f90b8c20 100644 --- a/parsl/tests/configs/globus_compute.py +++ b/parsl/tests/configs/globus_compute.py @@ -1,5 +1,7 @@ import os +from globus_compute_sdk import Executor + from parsl.config import Config from parsl.executors import GlobusComputeExecutor @@ -11,8 +13,8 @@ def fresh_config(): return Config( executors=[ GlobusComputeExecutor( + executor=Executor(endpoint_id=endpoint_id), label="globus_compute", - endpoint_id=endpoint_id ) ] ) From 04b5c022bb7a7ee507fbdc44153ec9b38bc2e131 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 18 Nov 2024 14:58:09 -0600 Subject: [PATCH 14/15] Minor variable renaming --- parsl/executors/globus_compute.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 0cbba7b195..228a8c6664 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -76,7 +76,7 @@ def __init__( ) super().__init__() - self._executor: Executor = executor + self.executor: Executor = executor self.resource_specification = resource_specification self.user_endpoint_config = user_endpoint_config self.label = label @@ -119,9 +119,9 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: else: user_endpoint_config = self.user_endpoint_config - self._executor.resource_specification = res_spec - self._executor.user_endpoint_config = user_endpoint_config - return self._executor.submit(func, *args, **kwargs) + self.executor.resource_specification = res_spec + self.executor.user_endpoint_config = user_endpoint_config + return self.executor.submit(func, *args, **kwargs) def shutdown(self): """Clean-up the resources associated with the Executor. @@ -129,4 +129,4 @@ def shutdown(self): GCE.shutdown will cancel all futures that have not yet registered with Globus Compute and will not wait for the launched futures to complete. """ - self._executor.shutdown(wait=False, cancel_futures=True) + self.executor.shutdown(wait=False, cancel_futures=True) From 4b4ce3431eaca18ec65c7d30a0cbb6f8eed93378 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Tue, 19 Nov 2024 12:04:34 -0600 Subject: [PATCH 15/15] Add step to stop endpoint at end of CI run. * Added an env var to specify Python version * Added a step to shutdown endpoint after tests --- .github/workflows/gce_test.yaml | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml index 970bff6b17..fb9e1e4cbf 100644 --- a/.github/workflows/gce_test.yaml +++ b/.github/workflows/gce_test.yaml @@ -6,6 +6,9 @@ on: - opened - synchronize +env: + PYTHON_VERSION: 3.11 + jobs: main-test-suite: runs-on: ubuntu-20.04 @@ -14,15 +17,15 @@ jobs: steps: - uses: actions/checkout@master - - name: Set up Python 3.11 + - name: Set up Python Environment uses: actions/setup-python@v4 with: - python-version: 3.11 + python-version: ${{ env.PYTHON_VERSION }} - name: Collect Job Information id: job-info run: | - echo "Python Version: 3.11" >> ci_job_info.txt + echo "Python Version: ${{ env.PYTHON_VERSION }} " >> ci_job_info.txt echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt @@ -85,11 +88,19 @@ jobs: make gce_test ln -s pytest-parsl/parsltest-current test_runinfo + - name: stop globus_compute_endpoint + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source /home/runner/work/parsl/parsl/.venv/bin/activate + globus-compute-endpoint stop default + - name: Archive runinfo logs if: ${{ always() }} uses: actions/upload-artifact@v4 with: - name: runinfo-3.11-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} + name: runinfo-${{ env.PYTHON_VERSION }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} path: | runinfo/ pytest-parsl/