Skip to content

Commit

Permalink
dm pipeline; Remove intermediate function: Move all tasks to the @flow
Browse files Browse the repository at this point in the history
Appropriately alter wait_for for dm pipeline runs
  • Loading branch information
annshress authored and philipmac committed Jan 25, 2024
1 parent 201f534 commit 5b91fef
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 32 deletions.
53 changes: 26 additions & 27 deletions em_workflows/dm_conversion/flow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from typing import Optional

from prefect import flow, task, unmapped
from prefect import flow, task, unmapped, allow_failure
from pytools.meta import is_int16
from pytools.convert import file_to_uint8

Expand All @@ -21,14 +21,17 @@


@task
def convert_dms_to_mrc(file_path: FilePath) -> None:
def convert_dms_to_mrc(file_path: FilePath) -> FilePath:
if file_path.fp_in.suffix.strip(".").lower() not in DMS_EXT:
return file_path
dm_as_mrc = file_path.gen_output_fp(out_fname="dm_as_mrc.mrc")
msg = f"Using dir: {file_path.fp_in}, : creating dm_as_mrc {dm_as_mrc}"
utils.log(msg=msg)
log_file = f"{dm_as_mrc.parent}/dm2mrc.log"
cmd = [DMConfig.dm2mrc_loc, file_path.fp_in.as_posix(), dm_as_mrc.as_posix()]
# utils.log(f"Generated cmd {cmd}")
FilePath.run(cmd=cmd, log_file=log_file)
return file_path


@task
Expand All @@ -39,6 +42,11 @@ def convert_if_int16_tiff(file_path: FilePath) -> None:
if 16 bit convert (write to assets_dir) & return Path
else return orig Path
"""
if not (
file_path.fp_in.suffix.strip(".").lower() in TIFS_EXT
and is_int16(file_path.fp_in)
):
return
tif_8_bit = file_path.gen_output_fp(out_fname="as_8_bit.tif")
utils.log(f"{file_path.fp_in} is a 16 bit tiff, converting to {tif_8_bit}")
file_to_uint8(in_file_path=file_path.fp_in, out_file_path=str(tif_8_bit))
Expand All @@ -60,6 +68,8 @@ def convert_2d_mrc_to_tiff(file_path: FilePath) -> None:
# min_max_histo = neuroglancer.gen_min_max_histo(file_path)
# utils.log(min_max_histo)
# utils.log(f"+++++++++++++++++++++++++++++++++++++++++++++")
if file_path.fp_in.suffix.strip(".").lower() not in MRCS_EXT:
return
dims = utils.lookup_dims(file_path.fp_in)
if dims.z != 1:
msg = f"mrc file {file_path.fp_in} is not 2 dimensional. Contains {dims.z} Z dims."
Expand Down Expand Up @@ -175,26 +185,6 @@ def scale_jpegs(file_path: FilePath, size: str) -> Optional[dict]:
return asset_elt


def convert_intermediate_files(fps):
tif_fps = [
fp
for fp in fps
if (fp.fp_in.suffix.strip(".").lower() in TIFS_EXT and is_int16(fp.fp_in))
]
if tif_fps:
convert_if_int16_tiff.map(file_path=tif_fps)

dm_fps = [fp for fp in fps if fp.fp_in.suffix.strip(".").lower() in DMS_EXT]
if dm_fps:
dm_to_mrc = convert_dms_to_mrc.map(dm_fps)
# mrc is intermed format, to jpeg conversion
convert_dm_mrc_to_jpeg.map(dm_fps, wait_for=[dm_to_mrc])

mrc_fps = [fp for fp in fps if fp.fp_in.suffix.strip(".").lower() in MRCS_EXT]
if mrc_fps:
convert_2d_mrc_to_tiff.map(file_path=mrc_fps)


@flow(
name="Small 2D",
flow_run_name=utils.generate_flow_run_name,
Expand Down Expand Up @@ -244,14 +234,23 @@ def dm_flow(
fps = utils.gen_fps.submit(
share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps
)
# logs = utils.init_log.map(file_path=fps)

# subflow calls are blocking, so lower task runs auto waits always
convert_intermediate_files(fps.result())
# TODO move the three conversion block into if-else based on filetypes
tiff_results = convert_if_int16_tiff.map(file_path=fps)

# mrc is intermed format, to jpeg conversion
dm_to_mrc = convert_dms_to_mrc.map(file_path=fps)
mrc_results = convert_dm_mrc_to_jpeg.map(dm_to_mrc)

mrc_tiff_results = convert_2d_mrc_to_tiff.map(file_path=fps, wait_for=[allow_failure(mrc_results)])

# Finally generate all valid suffixed results
keyimg_assets = scale_jpegs.map(fps, size=unmapped("l"))
thumb_assets = scale_jpegs.map(fps, size=unmapped("s"))
keyimg_assets = scale_jpegs.map(fps, size=unmapped("l"),
wait_for=[allow_failure(tiff_results),
allow_failure(mrc_tiff_results)])
thumb_assets = scale_jpegs.map(fps, size=unmapped("s"),
wait_for=[allow_failure(tiff_results),
allow_failure(mrc_tiff_results)])

prim_fps = utils.gen_prim_fps.map(fp_in=fps)
callback_with_thumbs = utils.add_asset.map(prim_fp=prim_fps, asset=thumb_assets)
Expand Down
4 changes: 2 additions & 2 deletions em_workflows/lrg_2d_rgb/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


@task
def convert_png_to_tiff(file_path: FilePath):
def convert_png_to_tiff(file_path: FilePath) -> FilePath:
"""
convert input.png -background white -alpha remove -alpha off ouput.tiff
Adding argument: -define tiff:tile-geometry=128x128
Expand Down Expand Up @@ -65,7 +65,7 @@ def gen_zarr(file_path: FilePath) -> None:
name="Zarr rechunk",
on_failure=[utils.collect_exception_task_hook],
)
def rechunk_zarr(file_path: FilePath):
def rechunk_zarr(file_path: FilePath) -> FilePath:
ng.rechunk_zarr(file_path=file_path)
return file_path

Expand Down
4 changes: 1 addition & 3 deletions em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def gen_align_xg(fp_in: FilePath) -> FilePath:
name="Newstack mrc generation",
on_failure=[utils.collect_exception_task_hook],
)
def gen_newstack_combi(fp_in: FilePath, stretch: None) -> tuple:
def gen_newstack_combi(fp_in: FilePath, stretch: None) -> Dict:
"""
eg::
Expand Down Expand Up @@ -122,8 +122,6 @@ def gen_newstack_combi(fp_in: FilePath, stretch: None) -> tuple:
utils.log(f"Created {cmd}")
FilePath.run(cmd=cmd, log_file=log_file)
assets_fp_adjusted_mrc = fp_in.copy_to_assets_dir(fp_to_cp=base_mrc)
# (to remove the need of wait_for in downstream)
# returning the asset as well as the input fp
return fp_in.gen_asset(
asset_type=AssetType.AVERAGED_VOLUME, asset_fp=assets_fp_adjusted_mrc
)
Expand Down

0 comments on commit 5b91fef

Please sign in to comment.