From 0fe2c45702b510b3d2f602f804630553bc499bd0 Mon Sep 17 00:00:00 2001 From: Anish Date: Wed, 10 Jan 2024 10:56:21 -0500 Subject: [PATCH] BRT flow refactor; use cleanup hooks over tasks --- em_workflows/brt/flow.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/em_workflows/brt/flow.py b/em_workflows/brt/flow.py index 830a3740..e2f0e172 100644 --- a/em_workflows/brt/flow.py +++ b/em_workflows/brt/flow.py @@ -44,7 +44,7 @@ from typing import Optional from pathlib import Path from natsort import os_sorted -from prefect import task, flow, unmapped, allow_failure +from prefect import task, flow, unmapped from prefect.states import Completed, Failed from pytools.HedwigZarrImages import HedwigZarrImages @@ -608,8 +608,14 @@ def get_callback_result(callback_data: list) -> list: flow_run_name=utils.generate_flow_run_name, log_prints=True, task_runner=BRTConfig.HIGH_SLURM_EXECUTOR, - on_completion=[utils.notify_api_completion], - on_failure=[utils.notify_api_completion], + on_completion=[ + utils.notify_api_completion, + utils.copy_workdirs_and_cleanup_hook, + ], + on_failure=[ + utils.notify_api_completion, + utils.copy_workdirs_and_cleanup_hook, + ], ) def brt_flow( # This block of params map are for adoc file specfication. @@ -746,14 +752,6 @@ def brt_flow( files_elts=callback_result, ) - copy_task = utils.copy_workdirs.map(file_path=fps) - - utils.cleanup_workdir.submit( - fps, - x_keep_workdir, - wait_for=[allow_failure(copy_task)], - ) - if callback_result: return Completed(message="At least one callback is correct!") return Failed(message="None of the files succeeded!")