Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: pulsar file collector #370

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flake8

mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192
types-paramiko
types-pkg-resources
types-setuptools
types-PyYAML
types-pycurl
types-requests
Expand Down
76 changes: 57 additions & 19 deletions docker/coexecutor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,79 @@
FROM conda/miniconda3
# use the root of the repository as context, i.e. `docker build . -f ./docker/coexecutor/Dockerfile`

FROM python:3.12-bookworm as build_wheel

ENV PIP_ROOT_USER_ACTION=ignore

WORKDIR /build

# install requirements
COPY requirements.txt .
COPY dev-requirements.txt .
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir setuptools -r requirements.txt -r dev-requirements.txt

# build Pulsar wheel
COPY . .
RUN python setup.py sdist bdist_wheel


FROM python:3.12-bookworm

ENV PYTHONUNBUFFERED 1
ENV PIP_ROOT_USER_ACTION=ignore
ENV DEBIAN_FRONTEND noninteractive
ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local

# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases)
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl gnupg \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \
&& echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list

# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm)
RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \
cat <<EOF > /etc/apt/preferences.d/bullseye.pref
Package: *
Pin: release n=bullseye
Pin-Priority: -1

Package: libslurm36, slurm
Pin: release n=bullseye
Pin-Priority: 100
EOF

# set up CVMFS repository
RUN apt-get update \
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb

# wget, gcc, pip - to build and install Pulsar.
# bzip2 for Miniconda.
# TODO: pycurl stuff...
RUN apt-get update \
&& apt-get install -y --no-install-recommends apt-transport-https \
RUN apt-get update && apt-get install -y --no-install-recommends \
# Install CVMFS client
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb \
&& rm -f cvmfs-release-latest_all.deb \
cvmfs cvmfs-config-default \
# Install packages
&& apt-get update \
&& apt-get install -y --no-install-recommends gcc \
libcurl4-openssl-dev \
cvmfs cvmfs-config-default \
slurm-llnl slurm-drmaa-dev \
bzip2 \
gcc libcurl4-openssl-dev \
munge libmunge-dev slurm slurm-drmaa-dev \
bzip2 \
# Install Pulsar Python requirements
&& pip install --no-cache-dir -U pip \
&& pip install --no-cache-dir drmaa wheel kombu pykube pycurl \
webob psutil PasteDeploy pyyaml paramiko \
# Remove build deps and cleanup
&& apt-get -y remove gcc wget lsb-release \
&& apt-get -y autoremove \
&& apt-get autoclean \
&& rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \
&& /usr/sbin/create-munge-key
&& apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log

COPY --from=build_wheel /build/dist/pulsar_app-*-py2.py3-none-any.whl /

ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl
SHELL ["/bin/bash", "-c"]

RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade
RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography
RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --upgrade 'importlib-metadata<5.0'
RUN _pulsar-configure-galaxy-cvmfs
RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda
68 changes: 60 additions & 8 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Any,
Dict,
List,
Optional,
Type,
)
from urllib.parse import urlencode
Expand Down Expand Up @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None):
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
self.actions = []
# Might want to make the working directory available here so that we know where to place archive
# for archive action

def action(self, source, type, mapper=None):
path = source.get("path", None)
Expand All @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None):
if mapper:
file_lister = mapper.file_lister
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds)
self.__process_action(action, type)
self.actions.append(action)
return action

def finalize(self):
return [_ for _ in (action.finalize() for action in self.actions) if _]

def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
Expand Down Expand Up @@ -265,6 +273,7 @@ def __process_action(self, action, file_type):
""" Extension point to populate extra action information after an
action has been created.
"""
action.file_type = file_type
if getattr(action, "inject_url", False):
self.__inject_url(action, file_type)
if getattr(action, "inject_ssh_properties", False):
Expand Down Expand Up @@ -300,10 +309,12 @@ class BaseAction:
whole_directory_transfer_supported = False
action_spec: Dict[str, Any] = {}
action_type: str
file_type: Optional[str] = None

def __init__(self, source, file_lister=None):
def __init__(self, source, file_lister=None, file_type=None):
self.source = source
self.file_lister = file_lister or DEFAULT_FILE_LISTER
self.file_type = file_type

@property
def path(self):
Expand Down Expand Up @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def finalize(self):
pass

def to_dict(self):
return self._extend_base_dict()

Expand Down Expand Up @@ -390,8 +404,8 @@ class RewriteAction(BaseAction):
action_type = "rewrite"
staging = STAGING_ACTION_NONE

def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.source_directory = source_directory
self.destination_directory = destination_directory

Expand Down Expand Up @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction):
action_type = "remote_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction):
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class JsonTransferAction(BaseAction):
"""
This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an
external system that can stage files in and out of the compute environment.
"""
inject_url = True
whole_directory_transfer_supported = True
action_type = "json_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister, file_type)
self.url = url
self._from_path = None
self._to_path = None

@classmethod
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def to_dict(self):
return self._extend_base_dict(url=self.url)

def write_to_path(self, path):
self._to_path = path

def write_from_path(self, pulsar_path: str):
self._from_path = pulsar_path

def finalize(self):
if self._to_path:
return {"url": self.url, "to_path": self._to_path}
else:
return {"url": self.url, "from_path": self._from_path}


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -664,6 +714,7 @@ def write_to_path(self, path):


DICTIFIABLE_ACTION_CLASSES = [
JsonTransferAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
Expand Down Expand Up @@ -844,6 +895,7 @@ def unstructured_map(self, path):

ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
JsonTransferAction,
RewriteAction,
TransferAction,
CopyAction,
Expand Down
64 changes: 60 additions & 4 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface):
self.job_manager_interface = job_manager_interface

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
Expand Down Expand Up @@ -405,7 +405,7 @@ def _build_status_request_message(self):
class MessageJobClient(BaseMessageJobClient):

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -439,7 +439,7 @@ def __init__(self, destination_params, job_id, client_manager, shell):
self.shell = shell

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -477,6 +477,55 @@ class ExecutionType(str, Enum):
PARALLEL = "parallel"


class LocalSequentialLaunchMixin(BaseRemoteConfiguredJobClient):

def launch(
self,
command_line,
dependencies_description=None,
env=None,
remote_staging=None,
job_config=None,
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None,
staging_manifest=None
) -> Optional[ExternalId]:
# 1. call staging script with staging manifest [handled by ARC]
# 2. call actual command_line
# 3. call script that does output collection (similar to __collect_outputs) and outputs staging manifest
# 4. stage outputs back using manifest [handled by ARC]
import importlib.resources
import tempfile
import subprocess
import sys
from pulsar import scripts
STAGING_SCRIPT = importlib.resources.path(scripts, "staging_arc.py")
MANIFEST_SCRIPT = importlib.resources.path(scripts, "collect_output_manifest.py")

with tempfile.NamedTemporaryFile(mode="w") as temp_fh:
temp_fh.write(json_dumps(staging_manifest))
temp_fh.flush()
staging_process = subprocess.run([sys.executable, STAGING_SCRIPT, "--json", temp_fh.name], capture_output=True)
assert staging_process.returncode == 0, staging_process.stderr.decode()
job_process = subprocess.run(command_line, shell=True, capture_output=True)
assert job_process.returncode == 0, job_process.stderr.decode()

job_directory = self.job_directory.job_directory

output_manifest_path = os.path.join(job_directory, "output_manifest.json")

with tempfile.NamedTemporaryFile(mode="w") as staging_config_fh:
staging_config_fh.write(json_dumps(remote_staging))
staging_config_fh.flush()

p = subprocess.run([sys.executable, MANIFEST_SCRIPT, "--job-directory", job_directory, "--staging-config-path", staging_config_fh.name, "--output-manifest-path", output_manifest_path])
assert p.returncode == 0

stage_out_process = subprocess.run([sys.executable, STAGING_SCRIPT, "--json", output_manifest_path], capture_output=True)
assert stage_out_process.returncode == 0, stage_out_process.stderr.decode()

class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient):
execution_type: ExecutionType
pulsar_container_image: str
Expand All @@ -491,7 +540,8 @@ def launch(
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None
pulsar_app_config=None,
staging_manifest=None
) -> Optional[ExternalId]:
"""
"""
Expand Down Expand Up @@ -756,6 +806,12 @@ def raw_check_complete(self) -> Dict[str, Any]:
}


class LocalSequentialClient(BaseMessageCoexecutionJobClient, LocalSequentialLaunchMixin):

def __init__(self, destination_params, job_id, client_manager):
super().__init__(destination_params, job_id, client_manager)


class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin):
"""A client that co-executes pods via GA4GH TES and depends on amqp for status updates."""

Expand Down
5 changes: 4 additions & 1 deletion pulsar/client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
MessageJobClient,
TesMessageCoexecutionJobClient,
TesPollingCoexecutionJobClient,
LocalSequentialClient,
)
from .destination import url_to_destination_params
from .object_client import ObjectStoreClient
Expand Down Expand Up @@ -256,6 +257,8 @@ def get_client(self, destination_params, job_id, **kwargs):
return K8sPollingCoexecutionJobClient(destination_params, job_id, self)
elif destination_params.get("tes_url", False):
return TesPollingCoexecutionJobClient(destination_params, job_id, self)
elif destination_params.get("arc_url", False):
return LocalSequentialClient(destination_params, job_id, self)
else:
raise Exception("Unknown client configuration")

Expand All @@ -268,7 +271,7 @@ def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface:
return ClientManager(**kwargs) # TODO: Consider more separation here.
elif kwargs.get('amqp_url', None):
return MessageQueueClientManager(**kwargs)
elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"):
elif kwargs.get("k8s_enabled") or kwargs.get("tes_url") or kwargs.get("arc_enabled"):
return PollingJobClientManager(**kwargs)
else:
return ClientManager(**kwargs)
Expand Down
Loading
Loading