diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ce2f54ec..3260e28f 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -190,6 +190,9 @@ def __init__(self, client=None, config=None): self.ssh_port = config.get("ssh_port", None) self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) + self.actions = [] + # Might want to make the working directory available here so that we know where to place archive + # for archive action def action(self, source, type, mapper=None): path = source.get("path", None) @@ -202,8 +205,12 @@ def action(self, source, type, mapper=None): action_kwds = mapper.action_kwds action = action_class(source, file_lister=file_lister, **action_kwds) self.__process_action(action, type) + self.actions.append(action) return action + def finalize(self): + return [_ for _ in (action.finalize() for action in self.actions) if _] + def unstructured_mappers(self): """ Return mappers that will map 'unstructured' files (i.e. go beyond mapping inputs, outputs, and config files). @@ -342,6 +349,9 @@ def _extend_base_dict(self, **kwds): base_dict.update(**kwds) return base_dict + def finalize(self): + pass + def to_dict(self): return self._extend_base_dict() @@ -513,6 +523,35 @@ def write_from_path(self, pulsar_path): tus_upload_file(self.url, pulsar_path) +class JsonTransferAction(BaseAction): + """ + This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an + external system that can stage files in and out of the compute environment. + """ + inject_url = True + whole_directory_transfer_supported = True + action_type = "json_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, source, file_lister=None, url=None): + super().__init__(source, file_lister) + self.url = url + self._path = None + + @classmethod + def from_dict(cls, action_dict): + return JsonTransferAction(source=action_dict["source"], url=action_dict["url"]) + + def write_to_path(self, path): + self._path = path + + def write_from_path(self, pulsar_path: str): + self._path = pulsar_path + + def finalize(self): + return {"url": self.url, "path": self.path} + + class RemoteObjectStoreCopyAction(BaseAction): """ """ @@ -664,6 +703,7 @@ def write_to_path(self, path): DICTIFIABLE_ACTION_CLASSES = [ + JsonTransferAction, RemoteCopyAction, RemoteTransferAction, RemoteTransferTusAction, @@ -844,6 +884,7 @@ def unstructured_map(self, path): ACTION_CLASSES: List[Type[BaseAction]] = [ NoneAction, + JsonTransferAction, RewriteAction, TransferAction, CopyAction, diff --git a/pulsar/client/client.py b/pulsar/client/client.py index e28d84ea..8478742d 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface): self.job_manager_interface = job_manager_interface def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -405,7 +405,7 @@ def _build_status_request_message(self): class MessageJobClient(BaseMessageJobClient): def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -439,7 +439,7 @@ def __init__(self, destination_params, job_id, client_manager, shell): self.shell = shell def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -477,6 +477,24 @@ class ExecutionType(str, Enum): PARALLEL = "parallel" +class LocalSequentialLaunchMixin(BaseRemoteConfiguredJobClient): + + def launch( + self, + command_line, + dependencies_description=None, + env=None, + remote_staging=None, + job_config=None, + dynamic_file_sources=None, + container_info=None, + token_endpoint=None, + pulsar_app_config=None, + staging_manifest=None + ) -> Optional[ExternalId]: + pass + + class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient): execution_type: ExecutionType pulsar_container_image: str @@ -491,7 +509,8 @@ def launch( dynamic_file_sources=None, container_info=None, token_endpoint=None, - pulsar_app_config=None + pulsar_app_config=None, + staging_manifest=None ) -> Optional[ExternalId]: """ """ @@ -756,6 +775,12 @@ def raw_check_complete(self) -> Dict[str, Any]: } +class LocalSequentialClient(BaseMessageCoexecutionJobClient, LocalSequentialLaunchMixin): + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + + class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin): """A client that co-executes pods via GA4GH TES and depends on amqp for status updates.""" diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index ea8f8aa1..82f9d348 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -72,6 +72,9 @@ def collect(self): self.__collect_other_working_directory_files() self.__collect_metadata_directory_files() self.__collect_job_directory_files() + # Give actions that require a final action, like those that write a manifest, to write out their content + self.__finalize_action_mapper() + # finalize collection here for executors that need this ? return self.exception_tracker.collection_failure_exceptions def __collect_working_directory_outputs(self): @@ -134,6 +137,9 @@ def __collect_job_directory_files(self): 'output_jobdir', ) + def __finalize_action_mapper(self): + self.action_mapper.finalize() + def __realized_dynamic_file_source_references(self): references = {"filename": [], "extra_files": []} diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 9f08bef8..57b7b300 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -71,6 +71,11 @@ def submit_job(client, client_job_description, job_config=None): # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint + + staging_manifest = file_stager.action_mapper.finalize() + if staging_manifest: + launch_kwds["staging_manifest"] = staging_manifest + # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) diff --git a/pulsar/managers/staging/pre.py b/pulsar/managers/staging/pre.py index 543e9cdf..4ab15654 100644 --- a/pulsar/managers/staging/pre.py +++ b/pulsar/managers/staging/pre.py @@ -14,6 +14,8 @@ def preprocess(job_directory, setup_actions, action_executor, object_store=None) action = from_dict(setup_action["action"]) if getattr(action, "inject_object_store", False): action.object_store = object_store + if getattr(action, "inject_job_directory", False): + action.job_directory = job_directory path = job_directory.calculate_path(name, input_type) description = "Staging {} '{}' via {} to {}".format(input_type, name, action, path) log.debug(description) diff --git a/test/action_mapper_test.py b/test/action_mapper_test.py index 2131cb65..7783696f 100644 --- a/test/action_mapper_test.py +++ b/test/action_mapper_test.py @@ -4,6 +4,17 @@ ) +def test_action_mapper_finalization(): + client = _client("json_transfer") + mapper = FileActionMapper(client) + mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') + mapper.action({'path': '/the_file'}, 'input') + mapper_summary = mapper.finalize() + assert len(mapper_summary) == 2 + assert mapper_summary[0]["path"] == '/opt/galaxy/tools/filters/catWrapper.py' + assert mapper_summary[1]["path"] == '/the_file' + + def test_endpoint_validation(): client = _min_client("remote_transfer") mapper = FileActionMapper(client) diff --git a/test/client_test.py b/test/client_test.py index 27abe112..31dfe235 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -125,6 +125,10 @@ def test_launch(): request_checker.assert_called() +def test_sequential_local(): + pass + + def __test_upload(upload_type): client = TestClient() (temp_fileno, temp_file_path) = tempfile.mkstemp() diff --git a/test/integration_test_cli_submit.py b/test/test_cli_submit.py similarity index 67% rename from test/integration_test_cli_submit.py rename to test/test_cli_submit.py index 64ee3e5c..c4670e65 100644 --- a/test/integration_test_cli_submit.py +++ b/test/test_cli_submit.py @@ -16,16 +16,28 @@ class BaseCliTestCase(TempDirectoryTestCase): + def setup_action_mapper(self, files_endpoint): + return { + "default_action": "remote_transfer", + "files_endpoint": files_endpoint, + } + def run_and_check_submission(self): job_id = "0" galaxy_working = temp_directory_persist() output_name = "dataset_1211231231231231231.dat" galaxy_output = os.path.join(galaxy_working, output_name) - pulsar_output = os.path.join(self.staging_directory, job_id, "outputs", output_name) + pulsar_output = os.path.join( + self.staging_directory, job_id, "outputs", output_name + ) pulsar_input = os.path.join(self.staging_directory, job_id, "inputs", "cow") with files_server("/") as test_files_server: files_endpoint = test_files_server.application_url - action = {"name": "cow", "type": "input", "action": {"action_type": "message", "contents": "cow file contents\n"}} + action = { + "name": "cow", + "type": "input", + "action": {"action_type": "message", "contents": "cow file contents\n"}, + } client_outputs = ClientOutputs( working_directory=galaxy_working, output_files=[os.path.join(galaxy_working, output_name)], @@ -39,10 +51,7 @@ def run_and_check_submission(self): setup=True, remote_staging={ "setup": [action], - "action_mapper": { - "default_action": "remote_transfer", - "files_endpoint": files_endpoint, - }, + "action_mapper": self.setup_action_mapper(files_endpoint), "client_outputs": client_outputs.to_dict(), }, ) @@ -74,7 +83,8 @@ def test(self): def _encode_application(self): app_conf = dict( staging_directory=self.staging_directory, - message_queue_url="memory://submittest" + message_queue_url="memory://submittest", + conda_auto_init=False, ) app_conf_path = os.path.join(self.config_directory, "app.yml") with open(app_conf_path, "w") as f: @@ -93,6 +103,29 @@ def test(self): def _encode_application(self): app_conf = dict( staging_directory=self.staging_directory, - message_queue_url="memory://submittest" + message_queue_url="memory://submittest", + conda_auto_init=False, + ) + return ["--app_conf_base64", to_base64_json(app_conf)] + + +class SequentialLocalCommandLineAppConfigTestCase(BaseCliTestCase): + + @skip_unless_module("kombu") + @integration_test + def test(self): + self.run_and_check_submission() + + def setup_action_mapper(self, files_endpoint): + return { + "default_action": "json_transfer", + "files_endpoint": files_endpoint, + } + + def _encode_application(self): + app_conf = dict( + staging_directory=self.staging_directory, + message_queue_url="memory://submittest", + conda_auto_init=False, ) return ["--app_conf_base64", to_base64_json(app_conf)] diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py index 30b927f9..0cf3053d 100644 --- a/test/transfer_action_test.py +++ b/test/transfer_action_test.py @@ -1,7 +1,35 @@ import os from .test_utils import files_server -from pulsar.client.action_mapper import RemoteTransferAction +from pulsar.client.action_mapper import ( + JsonTransferAction, + RemoteTransferAction, +) + + +def test_write_to_path_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "remote_get") + + to_path = os.path.join(directory, "local_get") + url = server.application_url + "?path=%s" % from_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_to_path(to_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} + + +def test_write_from_file_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "local_post") + to_path = os.path.join(directory, "remote_post") + url = server.application_url + "?path=%s" % to_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_from_path(from_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} def test_write_to_file():