Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for GlobusComputeExecutor #3607

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions .github/workflows/gce_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
name: GlobusComputeExecutor tests

on:
pull_request:
types:
- opened
- synchronize

jobs:
main-test-suite:
runs-on: ubuntu-20.04
timeout-minutes: 60

steps:
- uses: actions/checkout@master

- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: 3.11

- name: Collect Job Information
id: job-info
run: |
echo "Python Version: 3.11" >> ci_job_info.txt
echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt
Copy link
Collaborator

@khk-globus khk-globus Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider converting 3.11 into a variable that references the actual python version, rather than "happens to match until the python-version is changed." In ci.yaml, for example, this refers to the matrix.python-version variable. (Even in if the matrix is only one, that reduces the points of authority.)

echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt
echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt
echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt
echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt
as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")"
echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT

- name: setup virtual env
run: |
make virtualenv
source .venv/bin/activate

- name: Non-requirements based install
run: |
# mpich: required by mpi4py which is in test-requirements for radical-pilot
sudo apt-get update -q
sudo apt-get install -qy mpich

- name: make deps clean_coverage
run: |
source .venv/bin/activate
make deps
make clean_coverage

# Temporary fix until fixes make it to a release
git clone -b main https://github.com/globus/globus-compute.git
pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint
khk-globus marked this conversation as resolved.
Show resolved Hide resolved

- name: start globus_compute_endpoint
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source /home/runner/work/parsl/parsl/.venv/bin/activate
globus-compute-endpoint configure default
which globus-compute-endpoint
python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)"
python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)"
cat << EOF > /home/runner/.globus_compute/default/config.yaml
engine:
type: ThreadPoolEngine
benclifford marked this conversation as resolved.
Show resolved Hide resolved
max_workers: 4
EOF
cat /home/runner/.globus_compute/default/config.yaml
mkdir ~/.globus_compute/default/tasks_working_dir
globus-compute-endpoint start default
globus-compute-endpoint list
Comment on lines +68 to +76
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a new endpoint every test invocation, right? To be kind to the GC infrastructure, we should cache this, or save and re-use the endpoint id.

- name: make test
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source .venv/bin/activate
export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38)
echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT"
Comment on lines +83 to +84
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No sense in starting all the heavyweight machinery just to eventually crudely parse the string. Consider:

$ jq -r .endpoint_id < ~/.globus_compute/default/endpoint.json


export PARSL_TEST_PRESERVE_NUM_RUNS=7

make gce_test
ln -s pytest-parsl/parsltest-current test_runinfo

Comment on lines +89 to +90
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not chuffed about it, given the context of CI, but I do note that the endpoint is never shutdown.

- name: Archive runinfo logs
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: runinfo-3.11-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }}
path: |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same context as earlier: it would be good to not hard code the python version but to collect it from the environment.

runinfo/
pytest-parsl/
ci_job_info.txt
compression-level: 9
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ clean_coverage:
mypy: ## run mypy checks
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/

.PHONY: gce_test
gce_test: ## Run tests with GlobusComputeExecutor
pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10

.PHONY: local_thread_test
local_thread_test: ## run all tests with local_thread config
pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10
Expand Down
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Executors
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor
parsl.executors.radical.RadicalPilotExecutor
parsl.executors.globus_compute.GlobusComputeExecutor

Manager Selectors
=================
Expand Down
3 changes: 3 additions & 0 deletions docs/userguide/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ Parsl currently supports the following executors:
4. `parsl.executors.taskvine.TaskVineExecutor`: This executor uses `TaskVine <https://ccl.cse.nd.edu/software/taskvine/>`_ as the execution backend. TaskVine scales up to tens of thousands of cores and actively uses local storage on compute nodes to offer a diverse array of performance-oriented features, including: smart caching and sharing common large files between tasks and compute nodes, reliable execution of tasks, dynamic resource sizing, automatic Python environment detection and sharing.
These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors.

5. `parsl.executors.globus_compute.GlobusComputeExecutor`: This executor uses `Globus Compute <https://globus-compute.readthedocs.io/en/latest/index.html>`_
as the execution backend to run tasks on remote systems.
Comment on lines +90 to +91
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider adding more context here. I'm not calling for complete documentation, but this currently only answers the "what." How about a "why" or in what cases the GCE would be a good fit?


.. note::
Refer to :ref:`configuration-section` for information on how to configure these executors.

Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ ignore_missing_imports = True
#[mypy-multiprocessing.synchronization.*]
#ignore_missing_imports = True

[mypy-globus_compute_sdk.*]
ignore_missing_imports = True

[mypy-pandas.*]
ignore_missing_imports = True

Expand Down
4 changes: 3 additions & 1 deletion parsl/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from parsl.executors.flux.executor import FluxExecutor
from parsl.executors.globus_compute import GlobusComputeExecutor
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.threads import ThreadPoolExecutor
Expand All @@ -8,4 +9,5 @@
'HighThroughputExecutor',
'MPIExecutor',
'WorkQueueExecutor',
'FluxExecutor']
'FluxExecutor',
'GlobusComputeExecutor']
171 changes: 171 additions & 0 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from __future__ import annotations

import copy
import uuid
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional, Union

import typeguard

from parsl.errors import OptionalModuleMissing
from parsl.executors.base import ParslExecutor
from parsl.utils import RepresentationMixin

try:
from globus_compute_sdk import Client, Executor
_globus_compute_enabled = True
except ImportError:
_globus_compute_enabled = False

UUID_LIKE_T = Union[uuid.UUID, str]


class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints

GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_
and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_
for more details.

.. note::
As a remote execution system, Globus Compute relies on serialization to ship
tasks and results between the Parsl client side and the remote Globus Compute
Endpoint side. Serialization is unreliable across python versions, and
wrappers used by Parsl assume identical Parsl versions across on both sides.
We recommend using matching Python, Parsl and Globus Compute version on both
the client side and the endpoint side for stable behavior.

"""

@typeguard.typechecked
def __init__(
self,
endpoint_id: UUID_LIKE_T,
task_group_id: Optional[UUID_LIKE_T] = None,
resource_specification: Optional[Dict[str, Any]] = None,
user_endpoint_config: Optional[Dict[str, Any]] = None,
label: str = "GlobusComputeExecutor",
batch_size: int = 128,
amqp_port: Optional[int] = None,
client: Optional[Client] = None,
**kwargs,
):
"""
Parameters
----------

endpoint_id:
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
id of the endpoint to which to submit tasks
yadudoc marked this conversation as resolved.
Show resolved Hide resolved

task_group_id:
The Task Group to which to associate tasks. If not set,
one will be instantiated.

resource_specification:
Specify resource requirements for individual task execution.

user_endpoint_config:
User endpoint configuration values as described
and allowed by endpoint administrators. Must be a JSON-serializable dict
or None. Refer docs from `globus-compute
<https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html#templating-endpoint-configuration>`_
for more info.

label:
a label to name the executor

batch_size:
the maximum number of tasks to coalesce before
sending upstream [min: 1, default: 128]

amqp_port:
Port to use when connecting to results queue. Note that the
Compute web services only support 5671, 5672, and 443.

client:
instance of globus_compute_sdk.Client to be used by the executor.
If not provided, the executor will instantiate one with default arguments.

kwargs:
Other kwargs listed will be passed through to globus_compute_sdk.Executor
as is. Refer to `globus-compute docs
<https://globus-compute.readthedocs.io/en/latest/reference/executor.html#globus-compute-executor>`_
"""
super().__init__()
self.endpoint_id = endpoint_id
self.task_group_id = task_group_id
yadudoc marked this conversation as resolved.
Show resolved Hide resolved
self.resource_specification = resource_specification
self.user_endpoint_config = user_endpoint_config
self.label = label
self.batch_size = batch_size
self.amqp_port = amqp_port
self.client = client
self.executor_kwargs = kwargs

if not _globus_compute_enabled:
raise OptionalModuleMissing(
['globus-compute-sdk'],
"GlobusComputeExecutor requires globus-compute-sdk installed"
)
yadudoc marked this conversation as resolved.
Show resolved Hide resolved

def start(self) -> None:
""" Start the Globus Compute Executor """

self._executor: Executor = Executor(
endpoint_id=self.endpoint_id,
task_group_id=self.task_group_id,
resource_specification=self.resource_specification,
user_endpoint_config=self.user_endpoint_config,
label=self.label,
batch_size=self.batch_size,
amqp_port=self.amqp_port,
client=self.client,
**self.executor_kwargs
)

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
""" Submit func to globus-compute


Parameters
----------

func: Callable
Python function to execute remotely

resource_specification: Dict[str, Any]
Resource specification can be used specify MPI resources required by MPI applications on
Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config*
to configure endpoints when the endpoint is a `Multi-User Endpoint
<https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html#templating-endpoint-configuration>`_

args:
Args to pass to the function

kwargs:
kwargs to pass to the function

Returns
-------

Future
"""
res_spec = copy.deepcopy(resource_specification or self.resource_specification)
# Pop user_endpoint_config since it is illegal in resource_spec for globus_compute
if res_spec:
user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config)
else:
user_endpoint_config = self.user_endpoint_config

self._executor.resource_specification = res_spec
self._executor.user_endpoint_config = user_endpoint_config
return self._executor.submit(func, *args, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit horribly thread unsafe, were Parsl ever to get multithreaded submission - the sort of thing DFK.submitter_lock was put in place to deal with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially yes. I don't see how this is only a GCE specific problem. Secondly, I think it's safe to wait until users report this as an issue, or ask for it before working on this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, I don't think it is a GCE specific problem. But I think the GCE class needs to handle it all the same. What's wrong with wrapping this work in the lock?

Or is the point that this method is called from within the lock already?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is used from the DFK, this is already in a lock (although as you're both well aware, there is strong community pressure to use pieces of Parsl outside of the DFK). That lock was introduced in PR #625 because executors are not expected to be thread safe on submission, and so in that context this code is not dangerous.

This is a more general backpressure against using this style of API that seems to conflate the state of the submission system as a whole with parameters to a single task execution - I've definitely fixed concurrency bugs in Parsl because of this coding style before, that lead not to Parsl errors but to subtle misexecutions that mostly look plausible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see #3492, #1413)


def shutdown(self):
"""Clean-up the resources associated with the Executor.

GCE.shutdown will cancel all futures that have not yet registered with
Globus Compute and will not wait for the launched futures to complete.
"""
self._executor.shutdown(wait=False, cancel_futures=True)
yadudoc marked this conversation as resolved.
Show resolved Hide resolved
18 changes: 18 additions & 0 deletions parsl/tests/configs/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor


def fresh_config():

endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"]

return Config(
executors=[
GlobusComputeExecutor(
label="globus_compute",
endpoint_id=endpoint_id
)
]
)
4 changes: 4 additions & 0 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def pytest_configure(config):
'markers',
'shared_fs: Marks tests that require a shared_fs between the workers are the test client'
)
config.addinivalue_line(
'markers',
'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)'
)


@pytest.fixture(autouse=True, scope='session')
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_error_handling/test_resource_spec.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

import parsl
from parsl.app.app import python_app
from parsl.executors import WorkQueueExecutor
Expand All @@ -11,6 +13,7 @@ def double(x, parsl_resource_specification={}):
return x * 2


@pytest.mark.issue_3620
def test_resource(n=2):
executors = parsl.dfk().executors
executor = None
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'flux': ['pyyaml', 'cffi', 'jsonschema'],
'proxystore': ['proxystore'],
'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'],
'globus_compute': ['globus_compute_sdk>=2.27.1'],
# Disabling psi-j since github direct links are not allowed by pypi
# 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl']
}
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ types-mock
types-python-dateutil
types-requests
mpi4py
globus-compute-sdk>=2.27.1

# sqlalchemy is needed for typechecking, so it's here
# as well as at runtime for optional monitoring execution
Expand Down
Loading