Skip to content

Commit

Permalink
BRT flow refactor; use cleanup hooks over tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
annshress committed Jan 10, 2024
1 parent e413d32 commit 0fe2c45
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from typing import Optional
from pathlib import Path
from natsort import os_sorted
from prefect import task, flow, unmapped, allow_failure
from prefect import task, flow, unmapped
from prefect.states import Completed, Failed
from pytools.HedwigZarrImages import HedwigZarrImages

Expand Down Expand Up @@ -608,8 +608,14 @@ def get_callback_result(callback_data: list) -> list:
flow_run_name=utils.generate_flow_run_name,
log_prints=True,
task_runner=BRTConfig.HIGH_SLURM_EXECUTOR,
on_completion=[utils.notify_api_completion],
on_failure=[utils.notify_api_completion],
on_completion=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
on_failure=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
)
def brt_flow(
# This block of params map are for adoc file specfication.
Expand Down Expand Up @@ -746,14 +752,6 @@ def brt_flow(
files_elts=callback_result,
)

copy_task = utils.copy_workdirs.map(file_path=fps)

utils.cleanup_workdir.submit(
fps,
x_keep_workdir,
wait_for=[allow_failure(copy_task)],
)

if callback_result:
return Completed(message="At least one callback is correct!")
return Failed(message="None of the files succeeded!")
Expand Down

0 comments on commit 0fe2c45

Please sign in to comment.