diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index bf96dd10..b8135bcf 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -24,6 +24,11 @@ def status_dict(manager, job_id): def full_status(manager, job_status, job_id): if status.is_job_done(job_status): full_status = __job_complete_dict(job_status, manager, job_id) + if manager.is_live_stdout_update(): + # Stdout has already been sent, no need to send again. + full_status["stdout"] = None + full_status["stderr"] = None + else: full_status = {"complete": "false", "status": job_status, "job_id": job_id} return full_status @@ -129,4 +134,4 @@ def setup_job(manager, job_id, tool_id, tool_version): system_properties=manager.system_properties(), tool_id=tool_id, tool_version=tool_version - ) + ) \ No newline at end of file diff --git a/pulsar/manager_factory.py b/pulsar/manager_factory.py index 82950b26..0ab6ea24 100644 --- a/pulsar/manager_factory.py +++ b/pulsar/manager_factory.py @@ -76,6 +76,12 @@ def _get_default_options(conf): options["job_directory_mode"] = None if job_directory_mode is not None: options["job_directory_mode"] = int(job_directory_mode, 8) + stdout_update = conf.get("send_stdout_update", None) + if stdout_update is not None: + options["send_stdout_update"] = stdout_update + stdout_update_interval = conf.get("stdout_update_interval", None) + if stdout_update_interval is not None: + options["stdout_update_interval"] = stdout_update_interval return options diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index 317e38b9..23518541 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -123,6 +123,17 @@ def job_stdout_contents(self, *args, **kwargs): def job_stderr_contents(self, *args, **kwargs): return self._proxied_manager.job_stderr_contents(*args, **kwargs) + def is_live_stdout_update(self): + """ Optional. + Whether this manager is sending Stdout while the job is running (true if so) + """ + try: + # only present in stateful manager currently + stdout_live_update = self._proxied_manager.is_live_stdout_update() + return stdout_live_update + except AttributeError: + return False + def kill(self, *args, **kwargs): return self._proxied_manager.kill(*args, **kwargs) diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index 32b843d4..2de01b09 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -11,6 +11,9 @@ # If galaxy-tool-util or Galaxy 19.09 present. from galaxy.tool_util.deps.dependencies import DependenciesDescription +from pathlib import Path +import requests + import logging from pulsar.client.util import filter_destination_params @@ -18,6 +21,7 @@ ManagerProxy, status, ) +from pulsar.managers.base.directory import TOOL_FILE_STANDARD_ERROR, TOOL_FILE_STANDARD_OUTPUT from pulsar.managers.util.retry import RetryActionExecutor from .staging import ( postprocess, @@ -41,7 +45,8 @@ ACTIVE_STATUS_LAUNCHED = "launched" DEFAULT_MIN_POLLING_INTERVAL = 0.5 - +DEFAULT_SEND_STDOUT = False +DEFAULT_STDOUT_INTERVAL = 3.0 class StatefulManagerProxy(ManagerProxy): """ @@ -52,12 +57,16 @@ def __init__(self, manager, **manager_options): min_polling_interval = float(manager_options.get("min_polling_interval", DEFAULT_MIN_POLLING_INTERVAL)) preprocess_retry_action_kwds = filter_destination_params(manager_options, "preprocess_action_") postprocess_retry_action_kwds = filter_destination_params(manager_options, "postprocess_action_") + self.send_stdout = bool(manager_options.get("send_stdout_update", DEFAULT_SEND_STDOUT)) + self.stdout_update_interval = datetime.timedelta(0, float(manager_options.get("stdout_update_interval", DEFAULT_STDOUT_INTERVAL))) self.__preprocess_action_executor = RetryActionExecutor(**preprocess_retry_action_kwds) self.__postprocess_action_executor = RetryActionExecutor(**postprocess_retry_action_kwds) self.min_polling_interval = datetime.timedelta(0, min_polling_interval) self.active_jobs = ActiveJobs.from_manager(manager) self.__state_change_callback = self._default_status_change_callback self.__monitor = None + self.__stdout_file_pointer_map = dict() + self.__stderr_file_pointer_map = dict() def set_state_change_callback(self, state_change_callback): self.__state_change_callback = state_change_callback @@ -75,6 +84,12 @@ def trigger_state_change_callback(self, job_id): def name(self): return self._proxied_manager.name + def is_live_stdout_update(self): + """ + Whether this manager is sending Stdout while the job is running (true if so) + """ + return self.send_stdout + def setup_job(self, *args, **kwargs): job_id = self._proxied_manager.setup_job(*args, **kwargs) return job_id @@ -136,6 +151,8 @@ def _handling_of_preprocessing_state(self, job_id, launch_config): ) with job_directory.lock("status"): job_directory.store_metadata(JOB_FILE_PREPROCESSED, True) + self.__stdout_file_pointer_map[job_id] = 0 + self.__stderr_file_pointer_map[job_id] = 0 self.active_jobs.activate_job(job_id) except Exception as e: with job_directory.lock("status"): @@ -148,6 +165,58 @@ def _handling_of_preprocessing_state(self, job_id, launch_config): def handle_failure_before_launch(self, job_id): self.__state_change_callback(status.FAILED, job_id) + def post_remote_output(self, job_id, force_empty=False): + """ + Send output file back to Galaxy server via Galaxy API + """ + job_directory = self._proxied_manager.job_directory(job_id) + try: + files_endpoint = job_directory.load_metadata("launch_config")["remote_staging"]["action_mapper"][ + "files_endpoint"] + galaxy_file_dir = Path(job_directory.load_metadata("launch_config")["remote_staging"]["client_outputs"][ + "working_directory"]).parent / "outputs" + for filename in [TOOL_FILE_STANDARD_OUTPUT, TOOL_FILE_STANDARD_ERROR]: + file_contents = self._prepare_file_output(job_id, job_directory, filename) + if file_contents != "" or force_empty: + self._post_file(file_contents, galaxy_file_dir / (Path(filename).name), files_endpoint) + except Exception as e: + log.error("Error sending output to Galaxy server while job is running. Error: %s", e) + + def _prepare_file_output(self, job_id, job_directory, filename): + file_output = job_directory.open_file(filename, mode="rb") + if filename == TOOL_FILE_STANDARD_ERROR: + file_output.seek(self.__stderr_file_pointer_map.get(job_id, 0)) + diff = file_output.read() + self.__stderr_file_pointer_map[job_id] = self.__stderr_file_pointer_map[job_id] + len(diff) + return diff.decode("utf-8") + else: + file_output.seek(self.__stdout_file_pointer_map.get(job_id, 0)) + diff = file_output.read() + self.__stdout_file_pointer_map[job_id] = self.__stdout_file_pointer_map[job_id] + len(diff) + return diff.decode("utf-8") + + def _post_file(self, remote_file, path, endpoint): + file = {"file": (path.name, (remote_file))} + values = {"path": path, "file_type": "output"} + r = requests.post(url=endpoint, files=file, data=values) + log.debug("Successfully posted file to %s. Status Code: %d", path, r.status_code) + + def stdout_update(self, job_id): + def do_stdout_update(): + while self._proxied_manager.get_status(job_id) == status.RUNNING: + try: + microseconds = self.stdout_update_interval.microseconds \ + + (self.stdout_update_interval.seconds + self.stdout_update_interval.days * 24 * 3600) * ( + 10 ** 6) + total_seconds = microseconds / (10 ** 6) + time.sleep(total_seconds) + self.post_remote_output(job_id) + except Exception as e: + log.error("Error doing stdout update for job id: %s. Error: %s", job_id, e) + break + if self.send_stdout: + new_thread_for_job(self, "stdout_update", job_id, do_stdout_update, daemon=False) + def get_status(self, job_id): """ Compute status used proxied manager and handle state transitions and track additional state information needed. @@ -163,7 +232,7 @@ def get_status(self, job_id): self.__deactivate(job_id, proxy_status) elif state_change == "to_running": self.__state_change_callback(status.RUNNING, job_id) - + self.stdout_update(job_id) return self.__status(job_directory, proxy_status) def __proxy_status(self, job_directory, job_id): @@ -220,6 +289,7 @@ def do_postprocess(): postprocess_success = False job_directory = self._proxied_manager.job_directory(job_id) try: + self.post_remote_output(job_id, True) postprocess_success = postprocess(job_directory, self.__postprocess_action_executor) except Exception: log.exception("Failed to postprocess results for job id %s" % job_id) @@ -227,6 +297,9 @@ def do_postprocess(): if job_directory.has_metadata(JOB_FILE_PREPROCESSING_FAILED): final_status = status.FAILED self.__state_change_callback(final_status, job_id) + self.__stdout_file_pointer_map.pop(job_id) + self.__stderr_file_pointer_map.pop(job_id) + new_thread_for_job(self, "postprocess", job_id, do_postprocess, daemon=False) def shutdown(self, timeout=None): @@ -370,11 +443,12 @@ def _monitor_active_jobs(self): iteration_start = datetime.datetime.now() for active_job_id in active_job_ids: try: - self._check_active_job_status(active_job_id) + job_status = self._check_active_job_status(active_job_id) except Exception: log.exception("Failed checking active job status for job_id %s" % active_job_id) iteration_end = datetime.datetime.now() iteration_length = iteration_end - iteration_start + if iteration_length < self.stateful_manager.min_polling_interval: to_sleep = (self.stateful_manager.min_polling_interval - iteration_length) microseconds = to_sleep.microseconds + (to_sleep.seconds + to_sleep.days * 24 * 3600) * (10 ** 6) @@ -384,7 +458,7 @@ def _monitor_active_jobs(self): def _check_active_job_status(self, active_job_id): # Manager itself will handle state transitions when status changes, # just need to poll get_status - self.stateful_manager.get_status(active_job_id) + return self.stateful_manager.get_status(active_job_id) def new_thread_for_job(manager, action, job_id, target, daemon):