Skip to content

Commit

Permalink
Draft of JSON manifest collector
Browse files Browse the repository at this point in the history
This should be the general strategy for collecting input and output
files for ARC, DIRAC, AWS batch etc.
  • Loading branch information
mvdbeek committed Nov 11, 2024
1 parent 198dc8a commit 71085e2
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 13 deletions.
41 changes: 41 additions & 0 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ def __init__(self, client=None, config=None):
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
self.actions = []
# Might want to make the working directory available here so that we know where to place archive
# for archive action

def action(self, source, type, mapper=None):
path = source.get("path", None)
Expand All @@ -202,8 +205,12 @@ def action(self, source, type, mapper=None):
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
self.__process_action(action, type)
self.actions.append(action)
return action

def finalize(self):
return [_ for _ in (action.finalize() for action in self.actions) if _]

def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
Expand Down Expand Up @@ -342,6 +349,9 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def finalize(self):
pass

def to_dict(self):
return self._extend_base_dict()

Expand Down Expand Up @@ -513,6 +523,35 @@ def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class JsonTransferAction(BaseAction):
"""
This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an
external system that can stage files in and out of the compute environment.
"""
inject_url = True
whole_directory_transfer_supported = True
action_type = "json_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister)
self.url = url
self._path = None

@classmethod
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def write_to_path(self, path):
self._path = path

def write_from_path(self, pulsar_path: str):
self._path = pulsar_path

def finalize(self):
return {"url": self.url, "path": self.path}


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -664,6 +703,7 @@ def write_to_path(self, path):


DICTIFIABLE_ACTION_CLASSES = [
JsonTransferAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
Expand Down Expand Up @@ -844,6 +884,7 @@ def unstructured_map(self, path):

ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
JsonTransferAction,
RewriteAction,
TransferAction,
CopyAction,
Expand Down
33 changes: 29 additions & 4 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface):
self.job_manager_interface = job_manager_interface

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
Expand Down Expand Up @@ -405,7 +405,7 @@ def _build_status_request_message(self):
class MessageJobClient(BaseMessageJobClient):

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -439,7 +439,7 @@ def __init__(self, destination_params, job_id, client_manager, shell):
self.shell = shell

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -477,6 +477,24 @@ class ExecutionType(str, Enum):
PARALLEL = "parallel"


class LocalSequentialLaunchMixin(BaseRemoteConfiguredJobClient):

def launch(
self,
command_line,
dependencies_description=None,
env=None,
remote_staging=None,
job_config=None,
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None,
staging_manifest=None
) -> Optional[ExternalId]:
pass


class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient):
execution_type: ExecutionType
pulsar_container_image: str
Expand All @@ -491,7 +509,8 @@ def launch(
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None
pulsar_app_config=None,
staging_manifest=None
) -> Optional[ExternalId]:
"""
"""
Expand Down Expand Up @@ -756,6 +775,12 @@ def raw_check_complete(self) -> Dict[str, Any]:
}


class LocalSequentialClient(BaseMessageCoexecutionJobClient, LocalSequentialLaunchMixin):

def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)


class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin):
"""A client that co-executes pods via GA4GH TES and depends on amqp for status updates."""

Expand Down
6 changes: 6 additions & 0 deletions pulsar/client/staging/down.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def collect(self):
self.__collect_other_working_directory_files()
self.__collect_metadata_directory_files()
self.__collect_job_directory_files()
# Give actions that require a final action, like those that write a manifest, to write out their content
self.__finalize_action_mapper()
# finalize collection here for executors that need this ?
return self.exception_tracker.collection_failure_exceptions

def __collect_working_directory_outputs(self):
Expand Down Expand Up @@ -134,6 +137,9 @@ def __collect_job_directory_files(self):
'output_jobdir',
)

def __finalize_action_mapper(self):
self.action_mapper.finalize()

def __realized_dynamic_file_source_references(self):
references = {"filename": [], "extra_files": []}

Expand Down
5 changes: 5 additions & 0 deletions pulsar/client/staging/up.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def submit_job(client, client_job_description, job_config=None):
# it needs to be in the response to Pulsar even Pulsar is inititing staging actions
launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources
launch_kwds["token_endpoint"] = client.token_endpoint

staging_manifest = file_stager.action_mapper.finalize()
if staging_manifest:
launch_kwds["staging_manifest"] = staging_manifest

# for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external
# id from the submission process (e.g. to TES).
launch_response = client.launch(**launch_kwds)
Expand Down
2 changes: 2 additions & 0 deletions pulsar/managers/staging/pre.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def preprocess(job_directory, setup_actions, action_executor, object_store=None)
action = from_dict(setup_action["action"])
if getattr(action, "inject_object_store", False):
action.object_store = object_store
if getattr(action, "inject_job_directory", False):
action.job_directory = job_directory
path = job_directory.calculate_path(name, input_type)
description = "Staging {} '{}' via {} to {}".format(input_type, name, action, path)
log.debug(description)
Expand Down
11 changes: 11 additions & 0 deletions test/action_mapper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@
)


def test_action_mapper_finalization():
client = _client("json_transfer")
mapper = FileActionMapper(client)
mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input')
mapper.action({'path': '/the_file'}, 'input')
mapper_summary = mapper.finalize()
assert len(mapper_summary) == 2
assert mapper_summary[0]["path"] == '/opt/galaxy/tools/filters/catWrapper.py'
assert mapper_summary[1]["path"] == '/the_file'


def test_endpoint_validation():
client = _min_client("remote_transfer")
mapper = FileActionMapper(client)
Expand Down
4 changes: 4 additions & 0 deletions test/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def test_launch():
request_checker.assert_called()


def test_sequential_local():
pass


def __test_upload(upload_type):
client = TestClient()
(temp_fileno, temp_file_path) = tempfile.mkstemp()
Expand Down
49 changes: 41 additions & 8 deletions test/integration_test_cli_submit.py → test/test_cli_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,28 @@

class BaseCliTestCase(TempDirectoryTestCase):

def setup_action_mapper(self, files_endpoint):
return {
"default_action": "remote_transfer",
"files_endpoint": files_endpoint,
}

def run_and_check_submission(self):
job_id = "0"
galaxy_working = temp_directory_persist()
output_name = "dataset_1211231231231231231.dat"
galaxy_output = os.path.join(galaxy_working, output_name)
pulsar_output = os.path.join(self.staging_directory, job_id, "outputs", output_name)
pulsar_output = os.path.join(
self.staging_directory, job_id, "outputs", output_name
)
pulsar_input = os.path.join(self.staging_directory, job_id, "inputs", "cow")
with files_server("/") as test_files_server:
files_endpoint = test_files_server.application_url
action = {"name": "cow", "type": "input", "action": {"action_type": "message", "contents": "cow file contents\n"}}
action = {
"name": "cow",
"type": "input",
"action": {"action_type": "message", "contents": "cow file contents\n"},
}
client_outputs = ClientOutputs(
working_directory=galaxy_working,
output_files=[os.path.join(galaxy_working, output_name)],
Expand All @@ -39,10 +51,7 @@ def run_and_check_submission(self):
setup=True,
remote_staging={
"setup": [action],
"action_mapper": {
"default_action": "remote_transfer",
"files_endpoint": files_endpoint,
},
"action_mapper": self.setup_action_mapper(files_endpoint),
"client_outputs": client_outputs.to_dict(),
},
)
Expand Down Expand Up @@ -74,7 +83,8 @@ def test(self):
def _encode_application(self):
app_conf = dict(
staging_directory=self.staging_directory,
message_queue_url="memory://submittest"
message_queue_url="memory://submittest",
conda_auto_init=False,
)
app_conf_path = os.path.join(self.config_directory, "app.yml")
with open(app_conf_path, "w") as f:
Expand All @@ -93,6 +103,29 @@ def test(self):
def _encode_application(self):
app_conf = dict(
staging_directory=self.staging_directory,
message_queue_url="memory://submittest"
message_queue_url="memory://submittest",
conda_auto_init=False,
)
return ["--app_conf_base64", to_base64_json(app_conf)]


class SequentialLocalCommandLineAppConfigTestCase(BaseCliTestCase):

@skip_unless_module("kombu")
@integration_test
def test(self):
self.run_and_check_submission()

def setup_action_mapper(self, files_endpoint):
return {
"default_action": "json_transfer",
"files_endpoint": files_endpoint,
}

def _encode_application(self):
app_conf = dict(
staging_directory=self.staging_directory,
message_queue_url="memory://submittest",
conda_auto_init=False,
)
return ["--app_conf_base64", to_base64_json(app_conf)]
30 changes: 29 additions & 1 deletion test/transfer_action_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
import os

from .test_utils import files_server
from pulsar.client.action_mapper import RemoteTransferAction
from pulsar.client.action_mapper import (
JsonTransferAction,
RemoteTransferAction,
)


def test_write_to_path_json():
with files_server() as (server, directory):
from_path = os.path.join(directory, "remote_get")

to_path = os.path.join(directory, "local_get")
url = server.application_url + "?path=%s" % from_path
action = JsonTransferAction({"path": to_path}, url=url)
action.write_to_path(to_path)
assert action.path == to_path
assert action.url == url
assert action.finalize() == {"path": to_path, "url": url}


def test_write_from_file_json():
with files_server() as (server, directory):
from_path = os.path.join(directory, "local_post")
to_path = os.path.join(directory, "remote_post")
url = server.application_url + "?path=%s" % to_path
action = JsonTransferAction({"path": to_path}, url=url)
action.write_from_path(from_path)
assert action.path == to_path
assert action.url == url
assert action.finalize() == {"path": to_path, "url": url}


def test_write_to_file():
Expand Down

0 comments on commit 71085e2

Please sign in to comment.