Skip to content

Commit

Permalink
WIP: work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
kysrpex committed Nov 12, 2024
1 parent d7bb304 commit 0525bb9
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 8 deletions.
16 changes: 14 additions & 2 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,17 @@ def __init__(self, destination_params, job_id, job_manager_interface):
super().__init__(destination_params, job_id)
self.job_manager_interface = job_manager_interface

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
def launch(
self,
command_line,
dependencies_description=None,
env=None,
remote_staging=None,
job_config=None,
dynamic_file_sources=None,
token_endpoint=None,
staging_manifest=None
):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
Expand Down Expand Up @@ -780,6 +789,9 @@ class LocalSequentialClient(BaseMessageCoexecutionJobClient, LocalSequentialLaun
def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)

def put_file(self):
...


class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin):
"""A client that co-executes pods via GA4GH TES and depends on amqp for status updates."""
Expand Down
4 changes: 4 additions & 0 deletions pulsar/client/test/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ def extract_client_options(options):
client_options["tes_url"] = options.tes_url
if hasattr(options, "container"):
client_options["container"] = options.container
if hasattr(options, "arc_url"):
client_options["arc_url"] = options.arc_url
return client_options


Expand Down Expand Up @@ -532,6 +534,8 @@ def client_manager_from_args(options):
manager_args['tes_url'] = options.tes_url
if getattr(options, "k8s_enabled", None):
manager_args['k8s_enabled'] = options.k8s_enabled
if getattr(options, 'arc_enabled'):
manager_args['arc_enabled'] = options.arc_enabled
cm = build_client_manager(**manager_args)
return cm

Expand Down
1 change: 1 addition & 0 deletions pulsar/manager_endpoint_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def submit_job(manager, job_config):
"token_endpoint": token_endpoint,
}
manager.preprocess_and_launch(job_id, launch_config)
return launch_config
except Exception:
manager.handle_failure_before_launch(job_id)
raise
Expand Down
14 changes: 13 additions & 1 deletion pulsar/scripts/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
""" CLI related utilities for submitting Pulsar jobs.
"""
import fnmatch
from typing import Optional
import sys
from dataclasses import dataclass, field
import uuid

from pulsar.client import (
Expand Down Expand Up @@ -32,11 +34,15 @@
HELP_AMQP_URL = "Communicate with Pulsar listining on a message queue at this URL."
HELP_SERVER = "Run a Pulsar server locally instead of contacting a remote one."
HELP_COMMAND = "Shell command to execute on Pulsar server."
HELP_JOBS_DIRECTORY = "Local jobs directory"
HELP_WORKING_DIRECTORY = "Local working directory (will be translated to a new directory)."
HELP_OUTPUT = "Output glob to collect from job (relative to remote working directory)."
HELP_OUTPUT_PATTERN = "Output pattern to collect from job (relative to remote working directory)."
HELP_ARC = "Submit job to an ARC endpoint."
HELP_ARC_URL = "Use this URL as ARC endpoint."

DEFAULT_CLIENT_URL = 'http://localhost:8913/'
DEFAULT_ARC_URL = 'http://localhost:8082'


def main(argv=None):
Expand All @@ -55,10 +61,14 @@ def main(argv=None):
arg_parser.add_argument('--server', default=False, action="store_true", help=HELP_SERVER)
arg_parser.add_argument('--job_id', default=None, help=HELP_JOB_ID)
arg_parser.add_argument('--command', help=HELP_COMMAND)
# arg_parser.add_argument('--jobs_directory', default=".", help=HELP_JOBS_DIRECTORY)
arg_parser.add_argument('--working_directory', default=".", help=HELP_WORKING_DIRECTORY)
arg_parser.add_argument('--result_json', default=None)
arg_parser.add_argument('--output', default=[], action="append", help=HELP_OUTPUT)
arg_parser.add_argument('--output_pattern', default=[], action="append", help=HELP_OUTPUT_PATTERN)
# arg_parser.add_argument('--arc_enabled', default=False, action="store_true", help=HELP_ARC)
# arg_parser.add_argument('--arc_url', default=DEFAULT_ARC_URL, help=HELP_ARC_URL)


args = arg_parser.parse_args(argv)
if args.server:
Expand All @@ -71,7 +81,9 @@ def main(argv=None):
return 0


def _run_client_for_job(args):
def _run_client_for_job(
args: JobDescription
):
if args.job_id is None:
args.job_id = str(uuid.uuid4())
output_patterns = []
Expand Down
46 changes: 42 additions & 4 deletions pulsar/scripts/submit_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Optional

from pulsar.client.util import from_base64_json
from pulsar.client.manager import build_client_manager
Expand All @@ -18,6 +20,40 @@
DEFAULT_POLL_TIME = 2


HELP_AMQP_URL = "Communicate with Pulsar listining on a message queue at this URL."
HELP_SERVER = "Run a Pulsar server locally instead of contacting a remote one."
HELP_COMMAND = "Shell command to execute on Pulsar server."
HELP_JOBS_DIRECTORY = "Local jobs directory"
HELP_WORKING_DIRECTORY = "Local working directory (will be translated to a new directory)."
HELP_OUTPUT = "Output glob to collect from job (relative to remote working directory)."
HELP_OUTPUT_PATTERN = "Output pattern to collect from job (relative to remote working directory)."
HELP_ARC = "Submit job to an ARC endpoint."
HELP_ARC_URL = "Use this URL as ARC endpoint."

DEFAULT_CLIENT_URL = 'http://localhost:8913/'
DEFAULT_ARC_URL = 'http://localhost:8082'


@dataclass
class JobDescription:

command: str
working_directory: str = "."
result_json: Optional[str] = None
output: list[str] = field(default_factory=list)
output_pattern: list[str] = field(default_factory=list)
private_token: Optional[str] = None
url: str = DEFAULT_CLIENT_URL
amqp_url: str = DEFAULT_CLIENT_URL
default_file_action: str = "none"
file_action_config: Optional[str] = None
transport: Optional[str] = None
suppress_output: bool = False
cleanup: bool = True
server: bool = False
job_id: Optional[str] = None


def add_common_submit_args(arg_parser):
arg_parser.add_argument("--file", default=None)
arg_parser.add_argument("--base64", default=None)
Expand All @@ -31,13 +67,15 @@ def run_server_for_job(args):
manager, app = manager_from_args(config_builder)
try:
job_config = _load_job_config(args)
submit_job(manager, job_config)
launch_config = submit_job(manager, job_config)
if wait:
log.info("Co-execution job setup, now waiting for job completion and postprocessing.")
if args.build_client_manager:
client_manager = build_client_manager(arc_enabled=True)
client = client_manager.get_client({"arc_url": "http://localhost:8082", "jobs_directory": "/works"}, job_id=job_config["job_id"])
client.launch()
job_description = JobDescription(**)
# client_manager = build_client_manager(arc_enabled=True)
# client = client_manager.get_client({"arc_url": "http://localhost:8082", "jobs_directory": "/works"}, job_id=job_config["job_id"])
# launch_config = {key: value for key, value in launch_config.items() if key not in {"submit_params", "setup_params"}}
client.launch(**launch_config)
wait_for_job(manager, job_config)
log.info("Leaving finish_execution and shutting down app")
except BaseException:
Expand Down
21 changes: 20 additions & 1 deletion test/test_cli_submit.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

import os
import threading
import yaml

from .test_utils import (
Expand All @@ -11,7 +13,7 @@

from pulsar.client import ClientOutputs
from pulsar.client.util import to_base64_json
from pulsar.scripts import submit
from pulsar.scripts import run, submit


class BaseCliTestCase(TempDirectoryTestCase):
Expand Down Expand Up @@ -57,7 +59,24 @@ def run_and_check_submission(self):
)
base64 = to_base64_json(launch_params)
assert not os.path.exists(galaxy_output)

submit.main(["--build_client_manager", "true", "--base64", base64] + self._encode_application())

# run server
# server = threading.Thread(
# target=lambda: submit.main(["--base64", base64] + self._encode_application()),
# name="server"
# )
# server.start()

# run client
# client = threading.Thread(
# target=lambda: run.main(["--base64", base64, "--job_id", job_id, "--arc_enabled", "--command", launch_params["command_line"], "--working_directory", "empty"] + self._encode_application()),
# name="client",
# )
# client.start()

# server.join(), client.join()
assert os.path.exists(galaxy_output)
out_contents = open(galaxy_output).read()
assert out_contents == "cow file contents\n", out_contents
Expand Down

0 comments on commit 0525bb9

Please sign in to comment.