Skip to content

Commit

Permalink
Add generic error message for failed inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
annshress committed Jan 11, 2024
1 parent 0fe2c45 commit 18b60b9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 70 deletions.
31 changes: 14 additions & 17 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,31 +734,28 @@ def brt_flow(

# Ref: https://github.com/PrefectHQ/prefect/blob/98d33187ecce032defb8ec7a263de32564e7f7f6/src/prefect/futures.py#L43
callback_result = list()
for cb in callback_with_tilt_mov:
failed = 0
total = len(prim_fps)
for fp, cb in zip(fps.result(), callback_with_tilt_mov):
# Wait for the task to complete
# It does not mean that future state will be a terminal state
state = cb.wait()
try:
if state.is_completed():
json.dumps(cb.result())
callback_result.append(cb.result())
except TypeError: # can't serialize the item
utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}")

cb = utils.send_callback_body.submit(
if state.is_completed():
callback_result.append(cb.result())
else:
callback_result.append(fp.gen_prim_fp_elt("Something went wrong!"))
utils.log(
f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}"
)
failed += 1

utils.send_callback_body.submit(
x_no_api=x_no_api,
token=token,
callback_url=callback_url,
files_elts=callback_result,
)

if callback_result:
if failed < total:
return Completed(message="At least one callback is correct!")
return Failed(message="None of the files succeeded!")
"""
# if the callback is not empty (that is one of the files passed), final=success
final_state = bool(callback_with_tilt_mov)
# Previously, this was done by `set_reference_tasks`
# flow.set_reference_tasks([callback_with_tilt_mov])
return Completed(message="Success") if final_state else Failed(message="Failed")
"""
23 changes: 9 additions & 14 deletions em_workflows/czi/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
- Create thumbnail image from zarr label sub-image
"""
import asyncio
import json
from pathlib import Path
from typing import List, Dict, Optional

import SimpleITK as sitk
from prefect import flow, task
from prefect.states import Completed, Failed
from pytools.HedwigZarrImages import HedwigZarrImage, HedwigZarrImages

from em_workflows.file_path import FilePath
Expand Down Expand Up @@ -253,14 +251,15 @@ async def czi_flow(
)

callback_result = list()
for cb in callback_with_zarrs:
for fp, cb in zip(fps.result(), callback_with_zarrs):
state = cb.wait()
try:
if state.is_completed():
json.dumps(cb.result())
callback_result.append(cb.result())
except TypeError: # can't serialize the item
utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}")
if state.is_completed():
callback_result.append(cb.result())
else:
callback_result.append(fp.gen_prim_fp_elt("Something went wrong!"))
utils.log(
f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}"
)

# we have to filter out incomplete mapped runs before this reduce step
callback_result = find_thumb_idx.submit(callback=callback_result)
Expand All @@ -269,9 +268,5 @@ async def czi_flow(
x_no_api=x_no_api,
token=token,
callback_url=callback_url,
files_elts=callback_with_zarrs,
files_elts=callback_result,
)

if callback_with_zarrs:
return Completed(message="At least one callback is correct!")
return Failed(message="None of the files succeeded!")
21 changes: 8 additions & 13 deletions em_workflows/dm_conversion/flow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import json
from pathlib import Path
from typing import Optional

from prefect import flow, task, unmapped
from prefect.states import Completed, Failed
from pytools.meta import is_int16
from pytools.convert import file_to_uint8

Expand Down Expand Up @@ -262,22 +260,19 @@ def dm_flow(
)

callback_result = list()
for cb in callback_with_keyimgs:
for fp, cb in zip(fps.result(), callback_with_keyimgs):
state = cb.wait()
try:
if state.is_completed():
json.dumps(cb.result())
callback_result.append(cb.result())
except TypeError: # can't serialize the item
utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}")
if state.is_completed():
callback_result.append(cb.result())
else:
callback_result.append(fp.gen_prim_fp_elt("Something went wrong!"))
utils.log(
f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}"
)

utils.send_callback_body.submit(
x_no_api=x_no_api,
token=token,
callback_url=callback_url,
files_elts=callback_result,
)

if callback_result:
return Completed(message="At least one callback is correct!")
return Failed(message="None of the files succeeded!")
21 changes: 8 additions & 13 deletions em_workflows/lrg_2d_rgb/flow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json
from typing import Dict, Optional
from pathlib import Path

import SimpleITK as sitk
from pytools import HedwigZarrImage, HedwigZarrImages
from prefect import flow, task
from prefect.states import Completed, Failed

from em_workflows.utils import utils
from em_workflows.utils import neuroglancer as ng
Expand Down Expand Up @@ -192,22 +190,19 @@ def lrg_2d_flow(
)

callback_result = list()
for cb in callback_with_pyramids:
for fp, cb in zip(fps.result(), callback_with_pyramids):
state = cb.wait()
try:
if state.is_completed():
json.dumps(cb.result())
callback_result.append(cb.result())
except TypeError: # can't serialize the item
utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}")
if state.is_completed():
callback_result.append(cb.result())
else:
callback_result.append(fp.gen_prim_fp_elt("Something went wrong!"))
utils.log(
f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}"
)

utils.send_callback_body.submit(
x_no_api=x_no_api,
token=token,
callback_url=callback_url,
files_elts=callback_result,
)

if callback_result:
return Completed(message="At least one callback is correct!")
return Failed(message="None of the files succeeded!")
21 changes: 8 additions & 13 deletions em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@

import glob
import math
import json
from pathlib import Path
from typing import Dict, Optional
from natsort import os_sorted

from prefect import flow, task, unmapped
from prefect.states import Completed, Failed
from pytools.HedwigZarrImages import HedwigZarrImages

from em_workflows.utils import utils
Expand Down Expand Up @@ -386,22 +384,19 @@ def sem_tomo_flow(
)

callback_result = list()
for cb in callback_with_corr_movies:
for fp, cb in zip(fps.result(), callback_with_corr_movies):
state = cb.wait()
try:
if state.is_completed():
json.dumps(cb.result())
callback_result.append(cb.result())
except TypeError: # can't serialize the item
utils.log(f"Following item cannot be added to callback:\n\n{cb.result()}")
if state.is_completed():
callback_result.append(cb.result())
else:
callback_result.append(fp.gen_prim_fp_elt("Something went wrong!"))
utils.log(
f"Following item cannot be added to callback:\n\n{state.result(raise_on_failure=False)}"
)

utils.send_callback_body.submit(
x_no_api=x_no_api,
token=token,
callback_url=callback_url,
files_elts=callback_result,
)

if callback_result:
return Completed(message="At least one callback is correct!")
return Failed(message="None of the files succeeded!")

0 comments on commit 18b60b9

Please sign in to comment.