From 9cbc49182135ca58493622c00ab5160c363ec138 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 18 Nov 2024 14:38:17 -0600 Subject: [PATCH] Update GlobusComputeExecutor to accept the Executor directly rather than instantiate it --- parsl/executors/globus_compute.py | 75 +++++++-------------------- parsl/tests/configs/globus_compute.py | 4 +- 2 files changed, 21 insertions(+), 58 deletions(-) diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 23c003f8f7..0cbba7b195 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -12,7 +12,7 @@ from parsl.utils import RepresentationMixin try: - from globus_compute_sdk import Client, Executor + from globus_compute_sdk import Executor _globus_compute_enabled = True except ImportError: _globus_compute_enabled = False @@ -41,26 +41,22 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): @typeguard.typechecked def __init__( self, - endpoint_id: UUID_LIKE_T, - task_group_id: Optional[UUID_LIKE_T] = None, - resource_specification: Optional[Dict[str, Any]] = None, - user_endpoint_config: Optional[Dict[str, Any]] = None, - label: str = "GlobusComputeExecutor", - batch_size: int = 128, - amqp_port: Optional[int] = None, - client: Optional[Client] = None, - **kwargs, + executor: Executor, + label: str = 'GlobusComputeExecutor', + resource_specification: Optional[dict] = None, + user_endpoint_config: Optional[dict] = None, ): """ Parameters ---------- - endpoint_id: - id of the endpoint to which to submit tasks + executor: globus_compute_sdk.Executor + Pass a globus_compute_sdk Executor that will be used to execute + tasks on a globus_compute endpoint. Refer to `globus-compute docs + `_ - task_group_id: - The Task Group to which to associate tasks. If not set, - one will be instantiated. + label: + a label to name the executor resource_specification: Specify resource requirements for individual task execution. @@ -72,57 +68,22 @@ def __init__( `_ for more info. - label: - a label to name the executor - - batch_size: - the maximum number of tasks to coalesce before - sending upstream [min: 1, default: 128] - - amqp_port: - Port to use when connecting to results queue. Note that the - Compute web services only support 5671, 5672, and 443. - - client: - instance of globus_compute_sdk.Client to be used by the executor. - If not provided, the executor will instantiate one with default arguments. - - kwargs: - Other kwargs listed will be passed through to globus_compute_sdk.Executor - as is. Refer to `globus-compute docs - `_ """ - super().__init__() - self.endpoint_id = endpoint_id - self.task_group_id = task_group_id - self.resource_specification = resource_specification - self.user_endpoint_config = user_endpoint_config - self.label = label - self.batch_size = batch_size - self.amqp_port = amqp_port - self.client = client - self.executor_kwargs = kwargs - if not _globus_compute_enabled: raise OptionalModuleMissing( ['globus-compute-sdk'], "GlobusComputeExecutor requires globus-compute-sdk installed" ) + super().__init__() + self._executor: Executor = executor + self.resource_specification = resource_specification + self.user_endpoint_config = user_endpoint_config + self.label = label + def start(self) -> None: """ Start the Globus Compute Executor """ - - self._executor: Executor = Executor( - endpoint_id=self.endpoint_id, - task_group_id=self.task_group_id, - resource_specification=self.resource_specification, - user_endpoint_config=self.user_endpoint_config, - label=self.label, - batch_size=self.batch_size, - amqp_port=self.amqp_port, - client=self.client, - **self.executor_kwargs - ) + pass def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: """ Submit func to globus-compute diff --git a/parsl/tests/configs/globus_compute.py b/parsl/tests/configs/globus_compute.py index edb45801e0..19f90b8c20 100644 --- a/parsl/tests/configs/globus_compute.py +++ b/parsl/tests/configs/globus_compute.py @@ -1,5 +1,7 @@ import os +from globus_compute_sdk import Executor + from parsl.config import Config from parsl.executors import GlobusComputeExecutor @@ -11,8 +13,8 @@ def fresh_config(): return Config( executors=[ GlobusComputeExecutor( + executor=Executor(endpoint_id=endpoint_id), label="globus_compute", - endpoint_id=endpoint_id ) ] )