Skip to content

Commit

Permalink
Move tasks to appropriate flow modules
Browse files Browse the repository at this point in the history
HEDWG2-1211
  • Loading branch information
annshress committed Feb 14, 2024
1 parent bc2eda6 commit 5415a2e
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 330 deletions.
4 changes: 4 additions & 0 deletions em_workflows/brt/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import os
from collections import namedtuple

from em_workflows.config import Config


class BRTConfig(Config):
binvol = os.environ.get("BINVOL_LOC", "/opt/rml/imod/bin/binvol")
clip_loc = os.environ.get("CLIP_LOC", "/opt/rml/imod/bin/clip")


BrtOutput = namedtuple("BrtOutput", ["ali_file", "rec_file"])
209 changes: 160 additions & 49 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@

from typing import Dict
import json
from jinja2 import Environment, FileSystemLoader
import glob
import os
import shutil
import subprocess
from typing import Optional
from pathlib import Path
Expand All @@ -51,7 +53,7 @@
from em_workflows.utils import neuroglancer as ng
from em_workflows.constants import AssetType
from em_workflows.file_path import FilePath
from em_workflows.brt.config import BRTConfig
from em_workflows.brt.config import BRTConfig, BrtOutput
from em_workflows.brt.constants import BRT_DEPTH, BRT_HEIGHT, BRT_WIDTH


Expand Down Expand Up @@ -133,6 +135,159 @@ def gen_mrc2tiff(fp_in: Path) -> None:
FilePath.run(cmd=cmd, log_file=log_file)


def copy_template(working_dir: Path, template_name: str) -> Path:
"""
:param working_dir: libpath.Path of temporary working directory
:param template_name: base str name of the ADOC template
:return: libpath.Path of the copied file
copies the template adoc file to the working_dir
"""
adoc_fp = f"{working_dir}/{template_name}.adoc"
template_fp = f"{BRTConfig.template_dir}/{template_name}.adoc"
utils.log(f"trying to copy {template_fp} to {adoc_fp}")
shutil.copyfile(template_fp, adoc_fp)
return Path(adoc_fp)


def update_adoc(
adoc_fp: Path,
tg_fp: Path,
montage: int,
gold: int,
focus: int,
fiducialless: int,
trackingMethod: int,
TwoSurfaces: int,
TargetNumberOfBeads: int,
LocalAlignments: int,
THICKNESS: int,
) -> Path:
"""
| Uses jinja templating to update the adoc file with input params.
| dual_p is calculated by inputs_paired() and is used to define `dual`
| Some of these parameters are derived programatically.
:todo: Remove references to ``dual_p`` in comments?
"""
file_loader = FileSystemLoader(str(adoc_fp.parent))
env = Environment(loader=file_loader)
template = env.get_template(adoc_fp.name)

name = tg_fp.stem
currentBStackExt = None
stackext = tg_fp.suffix[1:]
# if dual_p:
# dual = 1
# currentBStackExt = tg_fp.suffix[1:] # TODO - assumes both files are same ext
datasetDirectory = adoc_fp.parent
if int(TwoSurfaces) == 0:
SurfacesToAnalyze = 1
elif int(TwoSurfaces) == 1:
SurfacesToAnalyze = 2
else:
raise ValueError(
f"Unable to resolve SurfacesToAnalyze, TwoSurfaces \
is set to {TwoSurfaces}, and should be 0 or 1"
)
rpa_thickness = int(int(THICKNESS) * 1.5)

vals = {
"name": name,
"stackext": stackext,
"currentBStackExt": currentBStackExt,
"montage": montage,
"gold": gold,
"focus": focus,
"datasetDirectory": datasetDirectory,
"fiducialless": fiducialless,
"trackingMethod": trackingMethod,
"TwoSurfaces": TwoSurfaces,
"TargetNumberOfBeads": TargetNumberOfBeads,
"SurfacesToAnalyze": SurfacesToAnalyze,
"LocalAlignments": LocalAlignments,
"rpa_thickness": rpa_thickness,
"THICKNESS": THICKNESS,
}

output = template.render(vals)
adoc_loc = Path(f"{adoc_fp.parent}/{tg_fp.stem}.adoc")
utils.log("Created adoc: adoc_loc.as_posix()")
with open(adoc_loc, "w") as _file:
print(output, file=_file)
utils.log(f"generated {adoc_loc}")
return adoc_loc


@task(
name="Batchruntomo conversion",
on_failure=[utils.collect_exception_task_hook],
)
def run_brt(
file_path: FilePath,
adoc_template: str,
montage: int,
gold: int,
focus: int,
fiducialless: int,
trackingMethod: int,
TwoSurfaces: int,
TargetNumberOfBeads: int,
LocalAlignments: int,
THICKNESS: int,
) -> BrtOutput:
"""
The natural place for this function is within the brt flow.
The reason for this is to facilitate testing. In prefect 1, a
flow lives within a context. This causes problems if things are mocked
for testing. If the function is in utils, these problems go away.
TODO, this is ugly. This might vanish in Prefect 2, since flows are
no longer obligated to being context dependant.
"""

adoc_fp = copy_template(
working_dir=file_path.working_dir, template_name=adoc_template
)
updated_adoc = update_adoc(
adoc_fp=adoc_fp,
tg_fp=file_path.fp_in,
montage=montage,
gold=gold,
focus=focus,
fiducialless=fiducialless,
trackingMethod=trackingMethod,
TwoSurfaces=TwoSurfaces,
TargetNumberOfBeads=TargetNumberOfBeads,
LocalAlignments=LocalAlignments,
THICKNESS=THICKNESS,
)
# why do we need to copy these?
utils.copy_tg_to_working_dir(
fname=file_path.fp_in, working_dir=file_path.working_dir
)

# START BRT (Batchruntomo) - long running process.
cmd = [
BRTConfig.brt_binary,
"-di",
updated_adoc.as_posix(),
"-cp",
"1",
"-gpu",
"1",
]
log_file = f"{file_path.working_dir}/brt_run.log"
FilePath.run(cmd, log_file)
rec_file = Path(f"{file_path.working_dir}/{file_path.base}_rec.mrc")
ali_file = Path(f"{file_path.working_dir}/{file_path.base}_ali.mrc")
utils.log(f"checking that dir {file_path.working_dir} contains ok BRT run")

for _file in [rec_file, ali_file]:
if not _file.exists():
raise ValueError(f"File {_file} does not exist. BRT run failure.")
return BrtOutput(ali_file=ali_file, rec_file=rec_file)


@task(
name="Thumbnail generation",
on_failure=[utils.collect_exception_task_hook],
Expand Down Expand Up @@ -179,7 +334,7 @@ def find_middle_image(fp_in: Path) -> Path:
name="Tilt movie generation",
on_failure=[utils.collect_exception_task_hook],
)
def gen_tilt_movie(brt_output: utils.BrtOutput) -> Path:
def gen_tilt_movie(brt_output: BrtOutput) -> Path:
"""
generates the tilt movie, eg::
Expand Down Expand Up @@ -229,7 +384,7 @@ def gen_tilt_movie(brt_output: utils.BrtOutput) -> Path:
name="Average mrc generation",
on_failure=[utils.collect_exception_task_hook],
)
def gen_ave_mrc(brt_output: utils.BrtOutput) -> Path:
def gen_ave_mrc(brt_output: BrtOutput) -> Path:
rec_file = brt_output.rec_file
utils.log("gen recon dims")
rec_z_dim = gen_dimension_command(fp_in=rec_file)
Expand Down Expand Up @@ -392,55 +547,11 @@ def cleanup_files(file_path: Path, pattern=str, keep_file: Path = None):
print(files_to_rm)


# @task
# def list_paired_files(fnames: List[Path]) -> List[Path]:
# """
# **THIS IS AN OLD FUNC** : Keeping for now, they'll probably want this back.
#
# If the input is a paired/dual axis shot, we can trucate the ``a|b`` off
# the filename, and use that string from this point on.
# We need to ensure there are only paired inputs in this list.
# """
# maybes = list()
# pairs = list()
# # paired_fnames = [fname.stem for fname in fps]
# for fname in fnames:
# if fname.stem.endswith("a"):
# # remove the last char of fname (keep extension)
# fname_no_a = f"{fname.parent}/{fname.stem[:-1]}{fname.suffix}"
# maybes.append(fname_no_a)
# for fname in fnames:
# if fname.stem.endswith("b"):
# fname_no_b = f"{fname.parent}/{fname.stem[:-1]}{fname.suffix}"
# if fname_no_b in maybes:
# pairs.append(Path(fname_no_b))
# return pairs
#
#
# @task
# def check_inputs_paired(fps: List[Path]):
# """
# THIS IS AN OLD FUNC : Keeping for now, they'll probably want this back.
# asks if there are ANY paired inputs in a dir.
# If there are, will return True, else False
# """
# fnames = [fname.stem for fname in fps]
# inputs_paired = False
# for fname in fnames:
# if fname.endswith("a"):
# # remove the last char, cat on a 'b' and lookup.
# pair_name = fname[:-1] + "b"
# if pair_name in fnames:
# inputs_paired = True
# utils.log(f"Are inputs paired? {inputs_paired}.")
# return inputs_paired


@task(
name="Zarr generation",
on_failure=[utils.collect_exception_task_hook],
)
def gen_zarr(brt_output: utils.BrtOutput) -> Path:
def gen_zarr(brt_output: BrtOutput) -> Path:

if not brt_output.rec_file.is_file():
raise ValueError(f"{brt_output.rec_file} does not exist")
Expand Down Expand Up @@ -567,7 +678,7 @@ def brt_flow(
fps = utils.gen_fps.submit(
share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps
)
brt_outputs = utils.run_brt.map(
brt_outputs = run_brt.map(
file_path=fps,
adoc_template=unmapped(adoc_template),
montage=unmapped(montage),
Expand Down
50 changes: 49 additions & 1 deletion em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,54 @@ def gen_zarr(fp_in: FilePath, **kwargs) -> FilePath:
return fp_in


@task(
name="mrc to movie generation",
on_failure=[utils.collect_exception_task_hook],
)
def mrc_to_movie(file_path: FilePath, root: str, asset_type: str, **kwargs):
"""
:param file_path: FilePath for the input
:param root: base name of the mrc file
:param asset_type: type of resulting output (movie)
:param kwargs: additional arguments to wait for before executing this func
- Uses the file_path to identify the working_dir which should have the "root" mrc
- Runs IMOD ``mrc2tif`` to convert the mrc to many jpgs named by z number
- Calls ``ffmpeg`` to create the mp4 movie from the jpgs and returns it as an asset
"""
mp4 = f"{file_path.working_dir}/{file_path.base}_mp4"
mrc = f"{file_path.working_dir}/{root}.mrc"
log_file = f"{file_path.working_dir}/recon_mrc2tiff.log"
cmd = [SEMConfig.mrc2tif_loc, "-j", "-C", "0,255", mrc, mp4]
FilePath.run(cmd=cmd, log_file=log_file)
mov = f"{file_path.working_dir}/{file_path.base}_{asset_type}.mp4"
test_p = Path(f"{file_path.working_dir}/{file_path.base}_mp4.1000.jpg")
mp4_input = f"{file_path.working_dir}/{file_path.base}_mp4.%03d.jpg"
if test_p.exists():
mp4_input = f"{file_path.working_dir}/{file_path.base}_mp4.%04d.jpg"
cmd = [
"ffmpeg",
"-f",
"image2",
"-framerate",
"8",
"-i",
mp4_input,
"-vcodec",
"libx264",
"-pix_fmt",
"yuv420p",
"-s",
"1024,1024",
mov,
]
log_file = f"{file_path.working_dir}/{file_path.base}_{asset_type}.log"
FilePath.run(cmd=cmd, log_file=log_file)
asset_fp = file_path.copy_to_assets_dir(fp_to_cp=Path(mov))
asset = file_path.gen_asset(asset_type=asset_type, asset_fp=asset_fp)
return asset


@task(
name="Neuroglancer metadata generation",
on_failure=[utils.collect_exception_task_hook],
Expand Down Expand Up @@ -374,7 +422,7 @@ def sem_tomo_flow(
base_mrcs = gen_newstack_combi.map(fp_in=align_xgs, stretch=stretchs)
# base_mrcs are passed in as kwargs to replace wait_for

corrected_movie_assets = utils.mrc_to_movie.map(
corrected_movie_assets = mrc_to_movie.map(
file_path=fps,
root=unmapped("adjusted"),
asset_type=unmapped(AssetType.REC_MOVIE),
Expand Down
Loading

0 comments on commit 5415a2e

Please sign in to comment.