Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature: Send stdout and stderr to Galaxy while job is running #345

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pulsar/manager_endpoint_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ def status_dict(manager, job_id):
def full_status(manager, job_status, job_id):
if status.is_job_done(job_status):
full_status = __job_complete_dict(job_status, manager, job_id)
if manager.is_live_stdout_update():
# Stdout has already been sent, no need to send again.
full_status["stdout"] = None
full_status["stderr"] = None

else:
full_status = {"complete": "false", "status": job_status, "job_id": job_id}
return full_status
Expand Down Expand Up @@ -129,4 +134,4 @@ def setup_job(manager, job_id, tool_id, tool_version):
system_properties=manager.system_properties(),
tool_id=tool_id,
tool_version=tool_version
)
)
6 changes: 6 additions & 0 deletions pulsar/manager_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def _get_default_options(conf):
options["job_directory_mode"] = None
if job_directory_mode is not None:
options["job_directory_mode"] = int(job_directory_mode, 8)
stdout_update = conf.get("send_stdout_update", None)
if stdout_update is not None:
options["send_stdout_update"] = stdout_update
stdout_update_interval = conf.get("stdout_update_interval", None)
if stdout_update_interval is not None:
options["stdout_update_interval"] = stdout_update_interval
return options


Expand Down
11 changes: 11 additions & 0 deletions pulsar/managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ def job_stdout_contents(self, *args, **kwargs):
def job_stderr_contents(self, *args, **kwargs):
return self._proxied_manager.job_stderr_contents(*args, **kwargs)

def is_live_stdout_update(self):
""" Optional.
Whether this manager is sending Stdout while the job is running (true if so)
"""
try:
# only present in stateful manager currently
stdout_live_update = self._proxied_manager.is_live_stdout_update()
return stdout_live_update
except AttributeError:
return False

def kill(self, *args, **kwargs):
return self._proxied_manager.kill(*args, **kwargs)

Expand Down
82 changes: 78 additions & 4 deletions pulsar/managers/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
# If galaxy-tool-util or Galaxy 19.09 present.
from galaxy.tool_util.deps.dependencies import DependenciesDescription

from pathlib import Path
import requests

import logging

from pulsar.client.util import filter_destination_params
from pulsar.managers import (
ManagerProxy,
status,
)
from pulsar.managers.base.directory import TOOL_FILE_STANDARD_ERROR, TOOL_FILE_STANDARD_OUTPUT
from pulsar.managers.util.retry import RetryActionExecutor
from .staging import (
postprocess,
Expand All @@ -41,7 +45,8 @@
ACTIVE_STATUS_LAUNCHED = "launched"

DEFAULT_MIN_POLLING_INTERVAL = 0.5

DEFAULT_SEND_STDOUT = False
DEFAULT_STDOUT_INTERVAL = 3.0

class StatefulManagerProxy(ManagerProxy):
"""
Expand All @@ -52,12 +57,16 @@ def __init__(self, manager, **manager_options):
min_polling_interval = float(manager_options.get("min_polling_interval", DEFAULT_MIN_POLLING_INTERVAL))
preprocess_retry_action_kwds = filter_destination_params(manager_options, "preprocess_action_")
postprocess_retry_action_kwds = filter_destination_params(manager_options, "postprocess_action_")
self.send_stdout = bool(manager_options.get("send_stdout_update", DEFAULT_SEND_STDOUT))
self.stdout_update_interval = datetime.timedelta(0, float(manager_options.get("stdout_update_interval", DEFAULT_STDOUT_INTERVAL)))
self.__preprocess_action_executor = RetryActionExecutor(**preprocess_retry_action_kwds)
self.__postprocess_action_executor = RetryActionExecutor(**postprocess_retry_action_kwds)
self.min_polling_interval = datetime.timedelta(0, min_polling_interval)
self.active_jobs = ActiveJobs.from_manager(manager)
self.__state_change_callback = self._default_status_change_callback
self.__monitor = None
self.__stdout_file_pointer_map = dict()
self.__stderr_file_pointer_map = dict()

def set_state_change_callback(self, state_change_callback):
self.__state_change_callback = state_change_callback
Expand All @@ -75,6 +84,12 @@ def trigger_state_change_callback(self, job_id):
def name(self):
return self._proxied_manager.name

def is_live_stdout_update(self):
"""
Whether this manager is sending Stdout while the job is running (true if so)
"""
return self.send_stdout

def setup_job(self, *args, **kwargs):
job_id = self._proxied_manager.setup_job(*args, **kwargs)
return job_id
Expand Down Expand Up @@ -136,6 +151,8 @@ def _handling_of_preprocessing_state(self, job_id, launch_config):
)
with job_directory.lock("status"):
job_directory.store_metadata(JOB_FILE_PREPROCESSED, True)
self.__stdout_file_pointer_map[job_id] = 0
self.__stderr_file_pointer_map[job_id] = 0
self.active_jobs.activate_job(job_id)
except Exception as e:
with job_directory.lock("status"):
Expand All @@ -148,6 +165,58 @@ def _handling_of_preprocessing_state(self, job_id, launch_config):
def handle_failure_before_launch(self, job_id):
self.__state_change_callback(status.FAILED, job_id)

def post_remote_output(self, job_id, force_empty=False):
"""
Send output file back to Galaxy server via Galaxy API
"""
job_directory = self._proxied_manager.job_directory(job_id)
try:
files_endpoint = job_directory.load_metadata("launch_config")["remote_staging"]["action_mapper"][
"files_endpoint"]
galaxy_file_dir = Path(job_directory.load_metadata("launch_config")["remote_staging"]["client_outputs"][
"working_directory"]).parent / "outputs"
for filename in [TOOL_FILE_STANDARD_OUTPUT, TOOL_FILE_STANDARD_ERROR]:
file_contents = self._prepare_file_output(job_id, job_directory, filename)
if file_contents != "" or force_empty:
self._post_file(file_contents, galaxy_file_dir / (Path(filename).name), files_endpoint)
except Exception as e:
log.error("Error sending output to Galaxy server while job is running. Error: %s", e)

def _prepare_file_output(self, job_id, job_directory, filename):
file_output = job_directory.open_file(filename, mode="rb")
if filename == TOOL_FILE_STANDARD_ERROR:
file_output.seek(self.__stderr_file_pointer_map.get(job_id, 0))
diff = file_output.read()
self.__stderr_file_pointer_map[job_id] = self.__stderr_file_pointer_map[job_id] + len(diff)
return diff.decode("utf-8")
else:
file_output.seek(self.__stdout_file_pointer_map.get(job_id, 0))
diff = file_output.read()
self.__stdout_file_pointer_map[job_id] = self.__stdout_file_pointer_map[job_id] + len(diff)
return diff.decode("utf-8")

def _post_file(self, remote_file, path, endpoint):
file = {"file": (path.name, (remote_file))}
values = {"path": path, "file_type": "output"}
r = requests.post(url=endpoint, files=file, data=values)
log.debug("Successfully posted file to %s. Status Code: %d", path, r.status_code)

def stdout_update(self, job_id):
def do_stdout_update():
while self._proxied_manager.get_status(job_id) == status.RUNNING:
try:
microseconds = self.stdout_update_interval.microseconds \
+ (self.stdout_update_interval.seconds + self.stdout_update_interval.days * 24 * 3600) * (
10 ** 6)
total_seconds = microseconds / (10 ** 6)
time.sleep(total_seconds)
self.post_remote_output(job_id)
except Exception as e:
log.error("Error doing stdout update for job id: %s. Error: %s", job_id, e)
break
if self.send_stdout:
new_thread_for_job(self, "stdout_update", job_id, do_stdout_update, daemon=False)

def get_status(self, job_id):
""" Compute status used proxied manager and handle state transitions
and track additional state information needed.
Expand All @@ -163,7 +232,7 @@ def get_status(self, job_id):
self.__deactivate(job_id, proxy_status)
elif state_change == "to_running":
self.__state_change_callback(status.RUNNING, job_id)

self.stdout_update(job_id)
return self.__status(job_directory, proxy_status)

def __proxy_status(self, job_directory, job_id):
Expand Down Expand Up @@ -220,13 +289,17 @@ def do_postprocess():
postprocess_success = False
job_directory = self._proxied_manager.job_directory(job_id)
try:
self.post_remote_output(job_id, True)
postprocess_success = postprocess(job_directory, self.__postprocess_action_executor)
except Exception:
log.exception("Failed to postprocess results for job id %s" % job_id)
final_status = status.COMPLETE if postprocess_success else status.FAILED
if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED):
final_status = status.FAILED
self.__state_change_callback(final_status, job_id)
self.__stdout_file_pointer_map.pop(job_id)
self.__stderr_file_pointer_map.pop(job_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to persist this somehow for this to work properly? If Pulsar restarts it is going to lose the current position right? Is that a problem?


new_thread_for_job(self, "postprocess", job_id, do_postprocess, daemon=False)

def shutdown(self, timeout=None):
Expand Down Expand Up @@ -370,11 +443,12 @@ def _monitor_active_jobs(self):
iteration_start = datetime.datetime.now()
for active_job_id in active_job_ids:
try:
self._check_active_job_status(active_job_id)
job_status = self._check_active_job_status(active_job_id)
except Exception:
log.exception("Failed checking active job status for job_id %s" % active_job_id)
iteration_end = datetime.datetime.now()
iteration_length = iteration_end - iteration_start

if iteration_length < self.stateful_manager.min_polling_interval:
to_sleep = (self.stateful_manager.min_polling_interval - iteration_length)
microseconds = to_sleep.microseconds + (to_sleep.seconds + to_sleep.days * 24 * 3600) * (10 ** 6)
Expand All @@ -384,7 +458,7 @@ def _monitor_active_jobs(self):
def _check_active_job_status(self, active_job_id):
# Manager itself will handle state transitions when status changes,
# just need to poll get_status
self.stateful_manager.get_status(active_job_id)
return self.stateful_manager.get_status(active_job_id)


def new_thread_for_job(manager, action, job_id, target, daemon):
Expand Down
Loading