diff --git a/em_workflows/brt/flow.py b/em_workflows/brt/flow.py index e2f0e172..e2c6246d 100644 --- a/em_workflows/brt/flow.py +++ b/em_workflows/brt/flow.py @@ -734,31 +734,28 @@ def brt_flow( # Ref: https://github.com/PrefectHQ/prefect/blob/98d33187ecce032defb8ec7a263de32564e7f7f6/src/prefect/futures.py#L43 callback_result = list() - for cb in callback_with_tilt_mov: + failed = 0 + total = len(prim_fps) + for fp, cb in zip(fps.result(), callback_with_tilt_mov): # Wait for the task to complete # It does not mean that future state will be a terminal state state = cb.wait() - try: - if state.is_completed(): - json.dumps(cb.result()) - callback_result.append(cb.result()) - except TypeError: # can't serialize the item - utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}") - - cb = utils.send_callback_body.submit( + if state.is_completed(): + callback_result.append(cb.result()) + else: + callback_result.append(fp.gen_prim_fp_elt("Something went wrong!")) + utils.log( + f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}" + ) + failed += 1 + + utils.send_callback_body.submit( x_no_api=x_no_api, token=token, callback_url=callback_url, files_elts=callback_result, ) - if callback_result: + if failed < total: return Completed(message="At least one callback is correct!") return Failed(message="None of the files succeeded!") - """ - # if the callback is not empty (that is one of the files passed), final=success - final_state = bool(callback_with_tilt_mov) - # Previously, this was done by `set_reference_tasks` - # flow.set_reference_tasks([callback_with_tilt_mov]) - return Completed(message="Success") if final_state else Failed(message="Failed") - """ diff --git a/em_workflows/czi/flow.py b/em_workflows/czi/flow.py index 739b8e87..024f0f60 100644 --- a/em_workflows/czi/flow.py +++ b/em_workflows/czi/flow.py @@ -16,13 +16,11 @@ - Create thumbnail image from zarr label sub-image """ import asyncio -import json from pathlib import Path from typing import List, Dict, Optional import SimpleITK as sitk from prefect import flow, task -from prefect.states import Completed, Failed from pytools.HedwigZarrImages import HedwigZarrImage, HedwigZarrImages from em_workflows.file_path import FilePath @@ -253,14 +251,15 @@ async def czi_flow( ) callback_result = list() - for cb in callback_with_zarrs: + for fp, cb in zip(fps.result(), callback_with_zarrs): state = cb.wait() - try: - if state.is_completed(): - json.dumps(cb.result()) - callback_result.append(cb.result()) - except TypeError: # can't serialize the item - utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}") + if state.is_completed(): + callback_result.append(cb.result()) + else: + callback_result.append(fp.gen_prim_fp_elt("Something went wrong!")) + utils.log( + f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}" + ) # we have to filter out incomplete mapped runs before this reduce step callback_result = find_thumb_idx.submit(callback=callback_result) @@ -269,9 +268,5 @@ async def czi_flow( x_no_api=x_no_api, token=token, callback_url=callback_url, - files_elts=callback_with_zarrs, + files_elts=callback_result, ) - - if callback_with_zarrs: - return Completed(message="At least one callback is correct!") - return Failed(message="None of the files succeeded!") diff --git a/em_workflows/dm_conversion/flow.py b/em_workflows/dm_conversion/flow.py index 92073f65..ef868ab8 100644 --- a/em_workflows/dm_conversion/flow.py +++ b/em_workflows/dm_conversion/flow.py @@ -1,9 +1,7 @@ -import json from pathlib import Path from typing import Optional from prefect import flow, task, unmapped -from prefect.states import Completed, Failed from pytools.meta import is_int16 from pytools.convert import file_to_uint8 @@ -262,14 +260,15 @@ def dm_flow( ) callback_result = list() - for cb in callback_with_keyimgs: + for fp, cb in zip(fps.result(), callback_with_keyimgs): state = cb.wait() - try: - if state.is_completed(): - json.dumps(cb.result()) - callback_result.append(cb.result()) - except TypeError: # can't serialize the item - utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}") + if state.is_completed(): + callback_result.append(cb.result()) + else: + callback_result.append(fp.gen_prim_fp_elt("Something went wrong!")) + utils.log( + f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}" + ) utils.send_callback_body.submit( x_no_api=x_no_api, @@ -277,7 +276,3 @@ def dm_flow( callback_url=callback_url, files_elts=callback_result, ) - - if callback_result: - return Completed(message="At least one callback is correct!") - return Failed(message="None of the files succeeded!") diff --git a/em_workflows/lrg_2d_rgb/flow.py b/em_workflows/lrg_2d_rgb/flow.py index fba8bf60..2637257a 100644 --- a/em_workflows/lrg_2d_rgb/flow.py +++ b/em_workflows/lrg_2d_rgb/flow.py @@ -1,11 +1,9 @@ -import json from typing import Dict, Optional from pathlib import Path import SimpleITK as sitk from pytools import HedwigZarrImage, HedwigZarrImages from prefect import flow, task -from prefect.states import Completed, Failed from em_workflows.utils import utils from em_workflows.utils import neuroglancer as ng @@ -192,14 +190,15 @@ def lrg_2d_flow( ) callback_result = list() - for cb in callback_with_pyramids: + for fp, cb in zip(fps.result(), callback_with_pyramids): state = cb.wait() - try: - if state.is_completed(): - json.dumps(cb.result()) - callback_result.append(cb.result()) - except TypeError: # can't serialize the item - utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}") + if state.is_completed(): + callback_result.append(cb.result()) + else: + callback_result.append(fp.gen_prim_fp_elt("Something went wrong!")) + utils.log( + f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}" + ) utils.send_callback_body.submit( x_no_api=x_no_api, @@ -207,7 +206,3 @@ def lrg_2d_flow( callback_url=callback_url, files_elts=callback_result, ) - - if callback_result: - return Completed(message="At least one callback is correct!") - return Failed(message="None of the files succeeded!") diff --git a/em_workflows/sem_tomo/flow.py b/em_workflows/sem_tomo/flow.py index a6c6a374..e19ae378 100644 --- a/em_workflows/sem_tomo/flow.py +++ b/em_workflows/sem_tomo/flow.py @@ -21,13 +21,11 @@ import glob import math -import json from pathlib import Path from typing import Dict, Optional from natsort import os_sorted from prefect import flow, task, unmapped -from prefect.states import Completed, Failed from pytools.HedwigZarrImages import HedwigZarrImages from em_workflows.utils import utils @@ -386,14 +384,15 @@ def sem_tomo_flow( ) callback_result = list() - for cb in callback_with_corr_movies: + for fp, cb in zip(fps.result(), callback_with_corr_movies): state = cb.wait() - try: - if state.is_completed(): - json.dumps(cb.result()) - callback_result.append(cb.result()) - except TypeError: # can't serialize the item - utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}") + if state.is_completed(): + callback_result.append(cb.result()) + else: + callback_result.append(fp.gen_prim_fp_elt("Something went wrong!")) + utils.log( + f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}" + ) utils.send_callback_body.submit( x_no_api=x_no_api, @@ -401,7 +400,3 @@ def sem_tomo_flow( callback_url=callback_url, files_elts=callback_result, ) - - if callback_result: - return Completed(message="At least one callback is correct!") - return Failed(message="None of the files succeeded!")