Skip to content

Commit

Permalink
CZI flow refactor; use cleanup hooks over tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
annshress committed Jan 10, 2024
1 parent ba48003 commit e413d32
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions em_workflows/czi/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
- Create thumbnail image from zarr label sub-image
"""
import asyncio
import json
from pathlib import Path
from typing import List, Dict, Optional

import SimpleITK as sitk
from prefect import flow, task
from pytools.HedwigZarrImage import HedwigZarrImage
from pytools.HedwigZarrImages import HedwigZarrImages
from prefect.states import Completed, Failed
from pytools.HedwigZarrImages import HedwigZarrImage, HedwigZarrImages

from em_workflows.file_path import FilePath
from em_workflows.utils import utils
Expand Down Expand Up @@ -209,8 +210,14 @@ def update_file_metadata(file_path: FilePath, callback_with_zarr: Dict) -> Dict:
flow_run_name=utils.generate_flow_run_name,
log_prints=True,
task_runner=CZIConfig.SLURM_EXECUTOR,
on_completion=[utils.notify_api_completion],
on_failure=[utils.notify_api_completion],
on_completion=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
on_failure=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
)
async def czi_flow(
file_share: str,
Expand Down Expand Up @@ -244,13 +251,27 @@ async def czi_flow(
callback_with_zarrs = update_file_metadata.map(
file_path=fps, callback_with_zarr=callback_with_zarrs
)
callback_with_zarrs = find_thumb_idx(callback=callback_with_zarrs)

utils.callback_with_cleanup(
fps=fps,
callback_result=callback_with_zarrs,
callback_result = list()
for cb in callback_with_zarrs:
state = cb.wait()
try:
if state.is_completed():
json.dumps(cb.result())
callback_result.append(cb.result())
except TypeError: # can't serialize the item
utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}")

# we have to filter out incomplete mapped runs before this reduce step
callback_result = find_thumb_idx.submit(callback=callback_result)

utils.send_callback_body.submit(
x_no_api=x_no_api,
callback_url=callback_url,
token=token,
x_keep_workdir=x_keep_workdir,
callback_url=callback_url,
files_elts=callback_with_zarrs,
)

if callback_with_zarrs:
return Completed(message="At least one callback is correct!")
return Failed(message="None of the files succeeded!")

0 comments on commit e413d32

Please sign in to comment.