From 5b91fefcc40dc65e11b0b6554bb86589cae9be09 Mon Sep 17 00:00:00 2001 From: Anish Date: Tue, 16 Jan 2024 14:41:55 -0500 Subject: [PATCH] dm pipeline; Remove intermediate function: Move all tasks to the @flow Appropriately alter wait_for for dm pipeline runs --- em_workflows/dm_conversion/flow.py | 53 +++++++++++++++--------------- em_workflows/lrg_2d_rgb/flow.py | 4 +-- em_workflows/sem_tomo/flow.py | 4 +-- 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/em_workflows/dm_conversion/flow.py b/em_workflows/dm_conversion/flow.py index d9634931..63b77f56 100644 --- a/em_workflows/dm_conversion/flow.py +++ b/em_workflows/dm_conversion/flow.py @@ -1,7 +1,7 @@ from pathlib import Path from typing import Optional -from prefect import flow, task, unmapped +from prefect import flow, task, unmapped, allow_failure from pytools.meta import is_int16 from pytools.convert import file_to_uint8 @@ -21,7 +21,9 @@ @task -def convert_dms_to_mrc(file_path: FilePath) -> None: +def convert_dms_to_mrc(file_path: FilePath) -> FilePath: + if file_path.fp_in.suffix.strip(".").lower() not in DMS_EXT: + return file_path dm_as_mrc = file_path.gen_output_fp(out_fname="dm_as_mrc.mrc") msg = f"Using dir: {file_path.fp_in}, : creating dm_as_mrc {dm_as_mrc}" utils.log(msg=msg) @@ -29,6 +31,7 @@ def convert_dms_to_mrc(file_path: FilePath) -> None: cmd = [DMConfig.dm2mrc_loc, file_path.fp_in.as_posix(), dm_as_mrc.as_posix()] # utils.log(f"Generated cmd {cmd}") FilePath.run(cmd=cmd, log_file=log_file) + return file_path @task @@ -39,6 +42,11 @@ def convert_if_int16_tiff(file_path: FilePath) -> None: if 16 bit convert (write to assets_dir) & return Path else return orig Path """ + if not ( + file_path.fp_in.suffix.strip(".").lower() in TIFS_EXT + and is_int16(file_path.fp_in) + ): + return tif_8_bit = file_path.gen_output_fp(out_fname="as_8_bit.tif") utils.log(f"{file_path.fp_in} is a 16 bit tiff, converting to {tif_8_bit}") file_to_uint8(in_file_path=file_path.fp_in, out_file_path=str(tif_8_bit)) @@ -60,6 +68,8 @@ def convert_2d_mrc_to_tiff(file_path: FilePath) -> None: # min_max_histo = neuroglancer.gen_min_max_histo(file_path) # utils.log(min_max_histo) # utils.log(f"+++++++++++++++++++++++++++++++++++++++++++++") + if file_path.fp_in.suffix.strip(".").lower() not in MRCS_EXT: + return dims = utils.lookup_dims(file_path.fp_in) if dims.z != 1: msg = f"mrc file {file_path.fp_in} is not 2 dimensional. Contains {dims.z} Z dims." @@ -175,26 +185,6 @@ def scale_jpegs(file_path: FilePath, size: str) -> Optional[dict]: return asset_elt -def convert_intermediate_files(fps): - tif_fps = [ - fp - for fp in fps - if (fp.fp_in.suffix.strip(".").lower() in TIFS_EXT and is_int16(fp.fp_in)) - ] - if tif_fps: - convert_if_int16_tiff.map(file_path=tif_fps) - - dm_fps = [fp for fp in fps if fp.fp_in.suffix.strip(".").lower() in DMS_EXT] - if dm_fps: - dm_to_mrc = convert_dms_to_mrc.map(dm_fps) - # mrc is intermed format, to jpeg conversion - convert_dm_mrc_to_jpeg.map(dm_fps, wait_for=[dm_to_mrc]) - - mrc_fps = [fp for fp in fps if fp.fp_in.suffix.strip(".").lower() in MRCS_EXT] - if mrc_fps: - convert_2d_mrc_to_tiff.map(file_path=mrc_fps) - - @flow( name="Small 2D", flow_run_name=utils.generate_flow_run_name, @@ -244,14 +234,23 @@ def dm_flow( fps = utils.gen_fps.submit( share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps ) - # logs = utils.init_log.map(file_path=fps) - # subflow calls are blocking, so lower task runs auto waits always - convert_intermediate_files(fps.result()) + # TODO move the three conversion block into if-else based on filetypes + tiff_results = convert_if_int16_tiff.map(file_path=fps) + + # mrc is intermed format, to jpeg conversion + dm_to_mrc = convert_dms_to_mrc.map(file_path=fps) + mrc_results = convert_dm_mrc_to_jpeg.map(dm_to_mrc) + + mrc_tiff_results = convert_2d_mrc_to_tiff.map(file_path=fps, wait_for=[allow_failure(mrc_results)]) # Finally generate all valid suffixed results - keyimg_assets = scale_jpegs.map(fps, size=unmapped("l")) - thumb_assets = scale_jpegs.map(fps, size=unmapped("s")) + keyimg_assets = scale_jpegs.map(fps, size=unmapped("l"), + wait_for=[allow_failure(tiff_results), + allow_failure(mrc_tiff_results)]) + thumb_assets = scale_jpegs.map(fps, size=unmapped("s"), + wait_for=[allow_failure(tiff_results), + allow_failure(mrc_tiff_results)]) prim_fps = utils.gen_prim_fps.map(fp_in=fps) callback_with_thumbs = utils.add_asset.map(prim_fp=prim_fps, asset=thumb_assets) diff --git a/em_workflows/lrg_2d_rgb/flow.py b/em_workflows/lrg_2d_rgb/flow.py index 4fe97d99..97c01c17 100644 --- a/em_workflows/lrg_2d_rgb/flow.py +++ b/em_workflows/lrg_2d_rgb/flow.py @@ -21,7 +21,7 @@ @task -def convert_png_to_tiff(file_path: FilePath): +def convert_png_to_tiff(file_path: FilePath) -> FilePath: """ convert input.png -background white -alpha remove -alpha off ouput.tiff Adding argument: -define tiff:tile-geometry=128x128 @@ -65,7 +65,7 @@ def gen_zarr(file_path: FilePath) -> None: name="Zarr rechunk", on_failure=[utils.collect_exception_task_hook], ) -def rechunk_zarr(file_path: FilePath): +def rechunk_zarr(file_path: FilePath) -> FilePath: ng.rechunk_zarr(file_path=file_path) return file_path diff --git a/em_workflows/sem_tomo/flow.py b/em_workflows/sem_tomo/flow.py index c19a0c94..8eb9f19f 100644 --- a/em_workflows/sem_tomo/flow.py +++ b/em_workflows/sem_tomo/flow.py @@ -94,7 +94,7 @@ def gen_align_xg(fp_in: FilePath) -> FilePath: name="Newstack mrc generation", on_failure=[utils.collect_exception_task_hook], ) -def gen_newstack_combi(fp_in: FilePath, stretch: None) -> tuple: +def gen_newstack_combi(fp_in: FilePath, stretch: None) -> Dict: """ eg:: @@ -122,8 +122,6 @@ def gen_newstack_combi(fp_in: FilePath, stretch: None) -> tuple: utils.log(f"Created {cmd}") FilePath.run(cmd=cmd, log_file=log_file) assets_fp_adjusted_mrc = fp_in.copy_to_assets_dir(fp_to_cp=base_mrc) - # (to remove the need of wait_for in downstream) - # returning the asset as well as the input fp return fp_in.gen_asset( asset_type=AssetType.AVERAGED_VOLUME, asset_fp=assets_fp_adjusted_mrc )