Skip to content

Commit

Permalink
Update GlobusComputeExecutor to accept the Executor directly rather t…
Browse files Browse the repository at this point in the history
…han instantiate it
  • Loading branch information
yadudoc committed Nov 18, 2024
1 parent 6cd4b54 commit 9cbc491
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 58 deletions.
75 changes: 18 additions & 57 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from parsl.utils import RepresentationMixin

try:
from globus_compute_sdk import Client, Executor
from globus_compute_sdk import Executor
_globus_compute_enabled = True
except ImportError:
_globus_compute_enabled = False
Expand Down Expand Up @@ -41,26 +41,22 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
@typeguard.typechecked
def __init__(
self,
endpoint_id: UUID_LIKE_T,
task_group_id: Optional[UUID_LIKE_T] = None,
resource_specification: Optional[Dict[str, Any]] = None,
user_endpoint_config: Optional[Dict[str, Any]] = None,
label: str = "GlobusComputeExecutor",
batch_size: int = 128,
amqp_port: Optional[int] = None,
client: Optional[Client] = None,
**kwargs,
executor: Executor,
label: str = 'GlobusComputeExecutor',
resource_specification: Optional[dict] = None,
user_endpoint_config: Optional[dict] = None,
):
"""
Parameters
----------
endpoint_id:
id of the endpoint to which to submit tasks
executor: globus_compute_sdk.Executor
Pass a globus_compute_sdk Executor that will be used to execute
tasks on a globus_compute endpoint. Refer to `globus-compute docs
<https://globus-compute.readthedocs.io/en/latest/reference/executor.html#globus-compute-executor>`_
task_group_id:
The Task Group to which to associate tasks. If not set,
one will be instantiated.
label:
a label to name the executor
resource_specification:
Specify resource requirements for individual task execution.
Expand All @@ -72,57 +68,22 @@ def __init__(
<https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html#templating-endpoint-configuration>`_
for more info.
label:
a label to name the executor
batch_size:
the maximum number of tasks to coalesce before
sending upstream [min: 1, default: 128]
amqp_port:
Port to use when connecting to results queue. Note that the
Compute web services only support 5671, 5672, and 443.
client:
instance of globus_compute_sdk.Client to be used by the executor.
If not provided, the executor will instantiate one with default arguments.
kwargs:
Other kwargs listed will be passed through to globus_compute_sdk.Executor
as is. Refer to `globus-compute docs
<https://globus-compute.readthedocs.io/en/latest/reference/executor.html#globus-compute-executor>`_
"""
super().__init__()
self.endpoint_id = endpoint_id
self.task_group_id = task_group_id
self.resource_specification = resource_specification
self.user_endpoint_config = user_endpoint_config
self.label = label
self.batch_size = batch_size
self.amqp_port = amqp_port
self.client = client
self.executor_kwargs = kwargs

if not _globus_compute_enabled:
raise OptionalModuleMissing(
['globus-compute-sdk'],
"GlobusComputeExecutor requires globus-compute-sdk installed"
)

super().__init__()
self._executor: Executor = executor
self.resource_specification = resource_specification
self.user_endpoint_config = user_endpoint_config
self.label = label

def start(self) -> None:
""" Start the Globus Compute Executor """

self._executor: Executor = Executor(
endpoint_id=self.endpoint_id,
task_group_id=self.task_group_id,
resource_specification=self.resource_specification,
user_endpoint_config=self.user_endpoint_config,
label=self.label,
batch_size=self.batch_size,
amqp_port=self.amqp_port,
client=self.client,
**self.executor_kwargs
)
pass

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
""" Submit func to globus-compute
Expand Down
4 changes: 3 additions & 1 deletion parsl/tests/configs/globus_compute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from globus_compute_sdk import Executor

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor

Expand All @@ -11,8 +13,8 @@ def fresh_config():
return Config(
executors=[
GlobusComputeExecutor(
executor=Executor(endpoint_id=endpoint_id),
label="globus_compute",
endpoint_id=endpoint_id
)
]
)

0 comments on commit 9cbc491

Please sign in to comment.