diff --git a/em_workflows/brt/config.py b/em_workflows/brt/config.py index b4d78475..296c571b 100644 --- a/em_workflows/brt/config.py +++ b/em_workflows/brt/config.py @@ -1,4 +1,5 @@ import os +from collections import namedtuple from em_workflows.config import Config @@ -6,3 +7,6 @@ class BRTConfig(Config): binvol = os.environ.get("BINVOL_LOC", "/opt/rml/imod/bin/binvol") clip_loc = os.environ.get("CLIP_LOC", "/opt/rml/imod/bin/clip") + + +BrtOutput = namedtuple("BrtOutput", ["ali_file", "rec_file"]) diff --git a/em_workflows/brt/flow.py b/em_workflows/brt/flow.py index a21c4f00..4e3025ab 100644 --- a/em_workflows/brt/flow.py +++ b/em_workflows/brt/flow.py @@ -37,8 +37,10 @@ from typing import Dict import json +from jinja2 import Environment, FileSystemLoader import glob import os +import shutil import subprocess from typing import Optional from pathlib import Path @@ -51,7 +53,7 @@ from em_workflows.utils import neuroglancer as ng from em_workflows.constants import AssetType from em_workflows.file_path import FilePath -from em_workflows.brt.config import BRTConfig +from em_workflows.brt.config import BRTConfig, BrtOutput from em_workflows.brt.constants import BRT_DEPTH, BRT_HEIGHT, BRT_WIDTH @@ -133,6 +135,159 @@ def gen_mrc2tiff(fp_in: Path) -> None: FilePath.run(cmd=cmd, log_file=log_file) +def copy_template(working_dir: Path, template_name: str) -> Path: + """ + :param working_dir: libpath.Path of temporary working directory + :param template_name: base str name of the ADOC template + :return: libpath.Path of the copied file + + copies the template adoc file to the working_dir + """ + adoc_fp = f"{working_dir}/{template_name}.adoc" + template_fp = f"{BRTConfig.template_dir}/{template_name}.adoc" + utils.log(f"trying to copy {template_fp} to {adoc_fp}") + shutil.copyfile(template_fp, adoc_fp) + return Path(adoc_fp) + + +def update_adoc( + adoc_fp: Path, + tg_fp: Path, + montage: int, + gold: int, + focus: int, + fiducialless: int, + trackingMethod: int, + TwoSurfaces: int, + TargetNumberOfBeads: int, + LocalAlignments: int, + THICKNESS: int, +) -> Path: + """ + | Uses jinja templating to update the adoc file with input params. + | dual_p is calculated by inputs_paired() and is used to define `dual` + | Some of these parameters are derived programatically. + + :todo: Remove references to ``dual_p`` in comments? + """ + file_loader = FileSystemLoader(str(adoc_fp.parent)) + env = Environment(loader=file_loader) + template = env.get_template(adoc_fp.name) + + name = tg_fp.stem + currentBStackExt = None + stackext = tg_fp.suffix[1:] + # if dual_p: + # dual = 1 + # currentBStackExt = tg_fp.suffix[1:] # TODO - assumes both files are same ext + datasetDirectory = adoc_fp.parent + if int(TwoSurfaces) == 0: + SurfacesToAnalyze = 1 + elif int(TwoSurfaces) == 1: + SurfacesToAnalyze = 2 + else: + raise ValueError( + f"Unable to resolve SurfacesToAnalyze, TwoSurfaces \ + is set to {TwoSurfaces}, and should be 0 or 1" + ) + rpa_thickness = int(int(THICKNESS) * 1.5) + + vals = { + "name": name, + "stackext": stackext, + "currentBStackExt": currentBStackExt, + "montage": montage, + "gold": gold, + "focus": focus, + "datasetDirectory": datasetDirectory, + "fiducialless": fiducialless, + "trackingMethod": trackingMethod, + "TwoSurfaces": TwoSurfaces, + "TargetNumberOfBeads": TargetNumberOfBeads, + "SurfacesToAnalyze": SurfacesToAnalyze, + "LocalAlignments": LocalAlignments, + "rpa_thickness": rpa_thickness, + "THICKNESS": THICKNESS, + } + + output = template.render(vals) + adoc_loc = Path(f"{adoc_fp.parent}/{tg_fp.stem}.adoc") + utils.log("Created adoc: adoc_loc.as_posix()") + with open(adoc_loc, "w") as _file: + print(output, file=_file) + utils.log(f"generated {adoc_loc}") + return adoc_loc + + +@task( + name="Batchruntomo conversion", + on_failure=[utils.collect_exception_task_hook], +) +def run_brt( + file_path: FilePath, + adoc_template: str, + montage: int, + gold: int, + focus: int, + fiducialless: int, + trackingMethod: int, + TwoSurfaces: int, + TargetNumberOfBeads: int, + LocalAlignments: int, + THICKNESS: int, +) -> BrtOutput: + """ + The natural place for this function is within the brt flow. + The reason for this is to facilitate testing. In prefect 1, a + flow lives within a context. This causes problems if things are mocked + for testing. If the function is in utils, these problems go away. + TODO, this is ugly. This might vanish in Prefect 2, since flows are + no longer obligated to being context dependant. + """ + + adoc_fp = copy_template( + working_dir=file_path.working_dir, template_name=adoc_template + ) + updated_adoc = update_adoc( + adoc_fp=adoc_fp, + tg_fp=file_path.fp_in, + montage=montage, + gold=gold, + focus=focus, + fiducialless=fiducialless, + trackingMethod=trackingMethod, + TwoSurfaces=TwoSurfaces, + TargetNumberOfBeads=TargetNumberOfBeads, + LocalAlignments=LocalAlignments, + THICKNESS=THICKNESS, + ) + # why do we need to copy these? + utils.copy_tg_to_working_dir( + fname=file_path.fp_in, working_dir=file_path.working_dir + ) + + # START BRT (Batchruntomo) - long running process. + cmd = [ + BRTConfig.brt_binary, + "-di", + updated_adoc.as_posix(), + "-cp", + "1", + "-gpu", + "1", + ] + log_file = f"{file_path.working_dir}/brt_run.log" + FilePath.run(cmd, log_file) + rec_file = Path(f"{file_path.working_dir}/{file_path.base}_rec.mrc") + ali_file = Path(f"{file_path.working_dir}/{file_path.base}_ali.mrc") + utils.log(f"checking that dir {file_path.working_dir} contains ok BRT run") + + for _file in [rec_file, ali_file]: + if not _file.exists(): + raise ValueError(f"File {_file} does not exist. BRT run failure.") + return BrtOutput(ali_file=ali_file, rec_file=rec_file) + + @task( name="Thumbnail generation", on_failure=[utils.collect_exception_task_hook], @@ -179,7 +334,7 @@ def find_middle_image(fp_in: Path) -> Path: name="Tilt movie generation", on_failure=[utils.collect_exception_task_hook], ) -def gen_tilt_movie(brt_output: utils.BrtOutput) -> Path: +def gen_tilt_movie(brt_output: BrtOutput) -> Path: """ generates the tilt movie, eg:: @@ -229,7 +384,7 @@ def gen_tilt_movie(brt_output: utils.BrtOutput) -> Path: name="Average mrc generation", on_failure=[utils.collect_exception_task_hook], ) -def gen_ave_mrc(brt_output: utils.BrtOutput) -> Path: +def gen_ave_mrc(brt_output: BrtOutput) -> Path: rec_file = brt_output.rec_file utils.log("gen recon dims") rec_z_dim = gen_dimension_command(fp_in=rec_file) @@ -392,55 +547,11 @@ def cleanup_files(file_path: Path, pattern=str, keep_file: Path = None): print(files_to_rm) -# @task -# def list_paired_files(fnames: List[Path]) -> List[Path]: -# """ -# **THIS IS AN OLD FUNC** : Keeping for now, they'll probably want this back. -# -# If the input is a paired/dual axis shot, we can trucate the ``a|b`` off -# the filename, and use that string from this point on. -# We need to ensure there are only paired inputs in this list. -# """ -# maybes = list() -# pairs = list() -# # paired_fnames = [fname.stem for fname in fps] -# for fname in fnames: -# if fname.stem.endswith("a"): -# # remove the last char of fname (keep extension) -# fname_no_a = f"{fname.parent}/{fname.stem[:-1]}{fname.suffix}" -# maybes.append(fname_no_a) -# for fname in fnames: -# if fname.stem.endswith("b"): -# fname_no_b = f"{fname.parent}/{fname.stem[:-1]}{fname.suffix}" -# if fname_no_b in maybes: -# pairs.append(Path(fname_no_b)) -# return pairs -# -# -# @task -# def check_inputs_paired(fps: List[Path]): -# """ -# THIS IS AN OLD FUNC : Keeping for now, they'll probably want this back. -# asks if there are ANY paired inputs in a dir. -# If there are, will return True, else False -# """ -# fnames = [fname.stem for fname in fps] -# inputs_paired = False -# for fname in fnames: -# if fname.endswith("a"): -# # remove the last char, cat on a 'b' and lookup. -# pair_name = fname[:-1] + "b" -# if pair_name in fnames: -# inputs_paired = True -# utils.log(f"Are inputs paired? {inputs_paired}.") -# return inputs_paired - - @task( name="Zarr generation", on_failure=[utils.collect_exception_task_hook], ) -def gen_zarr(brt_output: utils.BrtOutput) -> Path: +def gen_zarr(brt_output: BrtOutput) -> Path: if not brt_output.rec_file.is_file(): raise ValueError(f"{brt_output.rec_file} does not exist") @@ -567,7 +678,7 @@ def brt_flow( fps = utils.gen_fps.submit( share_name=file_share, input_dir=input_dir_fp, fps_in=input_fps ) - brt_outputs = utils.run_brt.map( + brt_outputs = run_brt.map( file_path=fps, adoc_template=unmapped(adoc_template), montage=unmapped(montage), diff --git a/em_workflows/sem_tomo/flow.py b/em_workflows/sem_tomo/flow.py index 8eb9f19f..a5ad6f9b 100644 --- a/em_workflows/sem_tomo/flow.py +++ b/em_workflows/sem_tomo/flow.py @@ -285,6 +285,54 @@ def gen_zarr(fp_in: FilePath, **kwargs) -> FilePath: return fp_in +@task( + name="mrc to movie generation", + on_failure=[utils.collect_exception_task_hook], +) +def mrc_to_movie(file_path: FilePath, root: str, asset_type: str, **kwargs): + """ + :param file_path: FilePath for the input + :param root: base name of the mrc file + :param asset_type: type of resulting output (movie) + :param kwargs: additional arguments to wait for before executing this func + + - Uses the file_path to identify the working_dir which should have the "root" mrc + - Runs IMOD ``mrc2tif`` to convert the mrc to many jpgs named by z number + - Calls ``ffmpeg`` to create the mp4 movie from the jpgs and returns it as an asset + """ + mp4 = f"{file_path.working_dir}/{file_path.base}_mp4" + mrc = f"{file_path.working_dir}/{root}.mrc" + log_file = f"{file_path.working_dir}/recon_mrc2tiff.log" + cmd = [SEMConfig.mrc2tif_loc, "-j", "-C", "0,255", mrc, mp4] + FilePath.run(cmd=cmd, log_file=log_file) + mov = f"{file_path.working_dir}/{file_path.base}_{asset_type}.mp4" + test_p = Path(f"{file_path.working_dir}/{file_path.base}_mp4.1000.jpg") + mp4_input = f"{file_path.working_dir}/{file_path.base}_mp4.%03d.jpg" + if test_p.exists(): + mp4_input = f"{file_path.working_dir}/{file_path.base}_mp4.%04d.jpg" + cmd = [ + "ffmpeg", + "-f", + "image2", + "-framerate", + "8", + "-i", + mp4_input, + "-vcodec", + "libx264", + "-pix_fmt", + "yuv420p", + "-s", + "1024,1024", + mov, + ] + log_file = f"{file_path.working_dir}/{file_path.base}_{asset_type}.log" + FilePath.run(cmd=cmd, log_file=log_file) + asset_fp = file_path.copy_to_assets_dir(fp_to_cp=Path(mov)) + asset = file_path.gen_asset(asset_type=asset_type, asset_fp=asset_fp) + return asset + + @task( name="Neuroglancer metadata generation", on_failure=[utils.collect_exception_task_hook], @@ -374,7 +422,7 @@ def sem_tomo_flow( base_mrcs = gen_newstack_combi.map(fp_in=align_xgs, stretch=stretchs) # base_mrcs are passed in as kwargs to replace wait_for - corrected_movie_assets = utils.mrc_to_movie.map( + corrected_movie_assets = mrc_to_movie.map( file_path=fps, root=unmapped("adjusted"), asset_type=unmapped(AssetType.REC_MOVIE), diff --git a/em_workflows/utils/utils.py b/em_workflows/utils/utils.py index ee159afd..c96706eb 100644 --- a/em_workflows/utils/utils.py +++ b/em_workflows/utils/utils.py @@ -7,8 +7,7 @@ from typing import List, Dict from pathlib import Path -from jinja2 import Environment, FileSystemLoader -from prefect import task, get_run_logger, allow_failure +from prefect import task, get_run_logger from prefect.exceptions import MissingContextError from prefect.states import State from prefect.flows import Flow, FlowRun @@ -20,7 +19,6 @@ # used for keeping outputs of imod's header command (dimensions of image). Header = namedtuple("Header", "x y z") -BrtOutput = namedtuple("BrtOutput", ["ali_file", "rec_file"]) def log(msg): @@ -82,54 +80,6 @@ def collect_exception_task_hook(task: Task, task_run: TaskRun, state: State): Config.local_storage.write_path(path, message.encode()) -@task( - name="mrc to movie generation", - on_failure=[collect_exception_task_hook], -) -def mrc_to_movie(file_path: FilePath, root: str, asset_type: str, **kwargs): - """ - :param file_path: FilePath for the input - :param root: base name of the mrc file - :param asset_type: type of resulting output (movie) - :param kwargs: additional arguments to wait for before executing this func - - - Uses the file_path to identify the working_dir which should have the "root" mrc - - Runs IMOD ``mrc2tif`` to convert the mrc to many jpgs named by z number - - Calls ``ffmpeg`` to create the mp4 movie from the jpgs and returns it as an asset - """ - mp4 = f"{file_path.working_dir}/{file_path.base}_mp4" - mrc = f"{file_path.working_dir}/{root}.mrc" - log_file = f"{file_path.working_dir}/recon_mrc2tiff.log" - cmd = [Config.mrc2tif_loc, "-j", "-C", "0,255", mrc, mp4] - FilePath.run(cmd=cmd, log_file=log_file) - mov = f"{file_path.working_dir}/{file_path.base}_{asset_type}.mp4" - test_p = Path(f"{file_path.working_dir}/{file_path.base}_mp4.1000.jpg") - mp4_input = f"{file_path.working_dir}/{file_path.base}_mp4.%03d.jpg" - if test_p.exists(): - mp4_input = f"{file_path.working_dir}/{file_path.base}_mp4.%04d.jpg" - cmd = [ - "ffmpeg", - "-f", - "image2", - "-framerate", - "8", - "-i", - mp4_input, - "-vcodec", - "libx264", - "-pix_fmt", - "yuv420p", - "-s", - "1024,1024", - mov, - ] - log_file = f"{file_path.working_dir}/{file_path.base}_{asset_type}.log" - FilePath.run(cmd=cmd, log_file=log_file) - asset_fp = file_path.copy_to_assets_dir(fp_to_cp=Path(mov)) - asset = file_path.gen_asset(asset_type=asset_type, asset_fp=asset_fp) - return asset - - @task def gen_prim_fps(fp_in: FilePath) -> Dict: """ @@ -197,9 +147,7 @@ def add_asset(prim_fp: dict, asset: dict, image_idx: int = None) -> dict: return prim_fp -# triggers like "always_run" are managed when calling the task itself -@task(retries=3, retry_delay_seconds=10) -def cleanup_workdir(fps: List[FilePath], x_keep_workdir: bool): +def cleanup_workdir(fp: FilePath, x_keep_workdir: bool): """ :param fp: a FilePath which has a working_dir to be removed @@ -210,78 +158,8 @@ def cleanup_workdir(fps: List[FilePath], x_keep_workdir: bool): if x_keep_workdir is True: log("x_keep_workdir is set to True, skipping removal.") else: - for fp in fps: - log(f"Trying to remove {fp.working_dir}") - fp.rm_workdir() - - -def update_adoc( - adoc_fp: Path, - tg_fp: Path, - montage: int, - gold: int, - focus: int, - fiducialless: int, - trackingMethod: int, - TwoSurfaces: int, - TargetNumberOfBeads: int, - LocalAlignments: int, - THICKNESS: int, -) -> Path: - """ - | Uses jinja templating to update the adoc file with input params. - | dual_p is calculated by inputs_paired() and is used to define `dual` - | Some of these parameters are derived programatically. - - :todo: Remove references to ``dual_p`` in comments? - """ - file_loader = FileSystemLoader(str(adoc_fp.parent)) - env = Environment(loader=file_loader) - template = env.get_template(adoc_fp.name) - - name = tg_fp.stem - currentBStackExt = None - stackext = tg_fp.suffix[1:] - # if dual_p: - # dual = 1 - # currentBStackExt = tg_fp.suffix[1:] # TODO - assumes both files are same ext - datasetDirectory = adoc_fp.parent - if int(TwoSurfaces) == 0: - SurfacesToAnalyze = 1 - elif int(TwoSurfaces) == 1: - SurfacesToAnalyze = 2 - else: - raise ValueError( - f"Unable to resolve SurfacesToAnalyze, TwoSurfaces \ - is set to {TwoSurfaces}, and should be 0 or 1" - ) - rpa_thickness = int(int(THICKNESS) * 1.5) - - vals = { - "name": name, - "stackext": stackext, - "currentBStackExt": currentBStackExt, - "montage": montage, - "gold": gold, - "focus": focus, - "datasetDirectory": datasetDirectory, - "fiducialless": fiducialless, - "trackingMethod": trackingMethod, - "TwoSurfaces": TwoSurfaces, - "TargetNumberOfBeads": TargetNumberOfBeads, - "SurfacesToAnalyze": SurfacesToAnalyze, - "LocalAlignments": LocalAlignments, - "rpa_thickness": rpa_thickness, - "THICKNESS": THICKNESS, - } - - output = template.render(vals) - adoc_loc = Path(f"{adoc_fp.parent}/{tg_fp.stem}.adoc") - log("Created adoc: adoc_loc.as_posix()") - with open(adoc_loc, "w") as _file: - print(output, file=_file) - log(f"generated {adoc_loc}") - return adoc_loc + log(f"Trying to remove {fp.working_dir}") + fp.rm_workdir() def copy_tg_to_working_dir(fname: Path, working_dir: Path) -> Path: @@ -305,80 +183,6 @@ def copy_tg_to_working_dir(fname: Path, working_dir: Path) -> Path: return new_loc -def copy_template(working_dir: Path, template_name: str) -> Path: - """ - :param working_dir: libpath.Path of temporary working directory - :param template_name: base str name of the ADOC template - :return: libpath.Path of the copied file - - copies the template adoc file to the working_dir - """ - adoc_fp = f"{working_dir}/{template_name}.adoc" - template_fp = f"{Config.template_dir}/{template_name}.adoc" - log(f"trying to copy {template_fp} to {adoc_fp}") - shutil.copyfile(template_fp, adoc_fp) - return Path(adoc_fp) - - -@task( - name="Batchruntomo conversion", - on_failure=[collect_exception_task_hook], -) -def run_brt( - file_path: FilePath, - adoc_template: str, - montage: int, - gold: int, - focus: int, - fiducialless: int, - trackingMethod: int, - TwoSurfaces: int, - TargetNumberOfBeads: int, - LocalAlignments: int, - THICKNESS: int, -) -> BrtOutput: - """ - The natural place for this function is within the brt flow. - The reason for this is to facilitate testing. In prefect 1, a - flow lives within a context. This causes problems if things are mocked - for testing. If the function is in utils, these problems go away. - TODO, this is ugly. This might vanish in Prefect 2, since flows are - no longer obligated to being context dependant. - """ - - adoc_fp = copy_template( - working_dir=file_path.working_dir, template_name=adoc_template - ) - updated_adoc = update_adoc( - adoc_fp=adoc_fp, - tg_fp=file_path.fp_in, - montage=montage, - gold=gold, - focus=focus, - fiducialless=fiducialless, - trackingMethod=trackingMethod, - TwoSurfaces=TwoSurfaces, - TargetNumberOfBeads=TargetNumberOfBeads, - LocalAlignments=LocalAlignments, - THICKNESS=THICKNESS, - ) - # why do we need to copy these? - copy_tg_to_working_dir(fname=file_path.fp_in, working_dir=file_path.working_dir) - - # START BRT (Batchruntomo) - long running process. - cmd = [Config.brt_binary, "-di", updated_adoc.as_posix(), "-cp", "1", "-gpu", "1"] - log_file = f"{file_path.working_dir}/brt_run.log" - FilePath.run(cmd, log_file) - rec_file = Path(f"{file_path.working_dir}/{file_path.base}_rec.mrc") - ali_file = Path(f"{file_path.working_dir}/{file_path.base}_ali.mrc") - log(f"checking that dir {file_path.working_dir} contains ok BRT run") - - for _file in [rec_file, ali_file]: - if not _file.exists(): - raise ValueError(f"File {_file} does not exist. BRT run failure.") - return BrtOutput(ali_file=ali_file, rec_file=rec_file) - - # TODO replace "trigger=always_run" @task(retries=1, retry_delay_seconds=10) def copy_workdirs(file_path: FilePath) -> Path: @@ -490,52 +294,13 @@ def notify_api_running( return response.ok -# def custom_terminal_state_handler( -# ) -> Optional[State]: -# """ -# we define any success at all to be a success -# """ -# success = False -# # iterate through reference task states looking for successes -# for task_state in reference_task_states: -# if task_state.is_successful(): -# success = True -# if success: -# message = "success" -# ns = Success( -# message=message, -# result=state.result, -# context=state.context, -# cached_inputs=state.cached_inputs, -# ) -# else: -# message = "error" -# ns = state -# if prefect.context.parameters.get("x_no_api"): -# log(f"x_no_api flag used, terminal: success is {message}") -# else: -# callback_url = prefect.context.parameters.get("callback_url") -# token = prefect.context.parameters.get("token") -# headers = { -# "Authorization": "Bearer " + token, -# "Content-Type": "application/json", -# } -# response = requests.post( -# callback_url, headers=headers, data=json.dumps({"status": message}) -# ) -# log(f"Pipeline status is:{message}, {response.text}") -# log(response.headers) -# if not response.ok: -# msg = f"Bad response code on callback: {response}" -# log(msg=msg) -# raise signals.FAIL(msg) -# return ns - - -def notify_api_completion(flow: Flow, flow_run: FlowRun, state: State): - """ - https://docs.prefect.io/core/concepts/states.html#overview. - https://docs.prefect.io/core/concepts/notifications.html#state-handlers +def notify_api_completion(flow: Flow, flow_run: FlowRun, state: State) -> bool: + """ + When the state changes for a workflow, this hook calls the backend api to update status + of the workflow in its db. + + The params for state change hooks can be found here: + https://docs.prefect.io/latest/guides/state-change-hooks/ """ status = "success" if state.is_completed() else "error" x_no_api = flow_run.parameters.get("x_no_api", False) @@ -546,7 +311,7 @@ def notify_api_completion(flow: Flow, flow_run: FlowRun, state: State): if x_no_api: log(f"x_no_api flag used\nCompletion status: {status}") - return + return True hooks_log = open(f"slurm-log/{flowrun_id}-notify-api-completion.txt", "w") hooks_log.write(f"Trying to notify: {x_no_api=}, {token=}, {callback_url=}\n") @@ -637,7 +402,6 @@ def gen_fps(share_name: str, input_dir: Path, fps_in: List[Path]) -> List[FilePa return fps -# TODO handle "trigger=any_successful" @task(retries=3, retry_delay_seconds=60) def send_callback_body( x_no_api: bool, @@ -681,6 +445,9 @@ def send_callback_body( def copy_workdirs_and_cleanup_hook(flow, flow_run, state): + """ + A flow state change hook called to copy workdir log files and also cleanup workdir after coppying. + """ stored_result = Config.local_storage.read_path(f"{flow_run.id}__gen_fps") fps: List[FilePath] = Config.pickle_serializer.loads( json.loads(stored_result)["data"].encode() @@ -690,34 +457,15 @@ def copy_workdirs_and_cleanup_hook(flow, flow_run, state): for fp in fps: copy_workdir_logs.fn(file_path=fp) - - cleanup_workdir.fn(fps, x_keep_workdir) - - -def callback_with_cleanup( - fps: List[FilePath], - callback_result: List, - x_no_api: bool = False, - callback_url: str = None, - token: str = None, - x_keep_workdir: bool = False, -): - cp_wd_logs_to_assets = copy_workdir_logs.map(fps, wait_for=[callback_result]) - - cb = send_callback_body.submit( - x_no_api=x_no_api, - token=token, - callback_url=callback_url, - files_elts=callback_result, - ) - cleanup_workdir.submit( - fps, - x_keep_workdir, - wait_for=[cb, allow_failure(cp_wd_logs_to_assets)], - ) + cleanup_workdir(fp, x_keep_workdir) def generate_flow_run_name(): + """ + Custom flow run names generator to replace default behavior, which is it simply uses flow function name + + https://docs.prefect.io/latest/concepts/flows/#flow-settings + """ parameters = flow_run.parameters name = Path(parameters["input_dir"]) share_name = parameters["file_share"] diff --git a/test/test_utils.py b/test/test_utils.py index 0a53f0d0..20aa5d1f 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -173,6 +173,8 @@ def test_update_adoc(mock_nfs_mount): Test successful modification of adoc based on a template :todo: consider parameterizing this to test many values """ + from em_workflows.brt import flow + adoc_file = "plastic_brt" montage = 0 gold = 15 @@ -193,7 +195,7 @@ def test_update_adoc(mock_nfs_mount): mrc_image = "test/input_files/brt_inputs/2013-1220-dA30_5-BSC-1_10.mrc" mrc_file = Path(os.path.join(Config.proj_dir(env), mrc_image)) - updated_adoc = utils.update_adoc( + updated_adoc = flow.update_adoc( adoc_fp=copied_tmplt, tg_fp=mrc_file, montage=montage, @@ -212,6 +214,8 @@ def test_update_adoc(mock_nfs_mount): def test_update_adoc_bad_surfaces(mock_nfs_mount): + from em_workflows.brt import flow + adoc_file = "plastic_brt" montage = 0 gold = 15 @@ -234,7 +238,7 @@ def test_update_adoc_bad_surfaces(mock_nfs_mount): mrc_file = Path(os.path.join(Config.proj_dir(env), mrc_image)) with pytest.raises(ValueError) as fail_msg: - utils.update_adoc( + flow.update_adoc( adoc_fp=copied_tmplt, tg_fp=mrc_file, montage=montage, @@ -261,6 +265,8 @@ def test_mrc_to_movie(mock_nfs_mount): :todo: Determine method for storing test data; smaller test images would be helpful as current mrc is 1.5 GB. """ + from em_workflows.sem_tomo import flow as sem_flow + proj_dir = Config.proj_dir("test") input_dir = "test/input_files/sem_inputs/Projects/mrc_movie_test" input_path = Path(os.path.join(proj_dir, input_dir)) @@ -277,8 +283,8 @@ def test_mrc_to_movie(mock_nfs_mount): shutil.copy(image_path, mrc_filepath.working_dir) # mrc_list = utils.gen_fps.__wrapped__(input_path, [image_path]) - asset = utils.mrc_to_movie.__wrapped__(mrc_filepath, "adjusted", "recMovie") - assert type(asset) == dict + asset = sem_flow.mrc_to_movie.__wrapped__(mrc_filepath, "adjusted", "recMovie") + assert type(asset) is dict assert "adjusted_recMovie.mp4" in asset["path"] @@ -286,9 +292,11 @@ def test_copy_template(mock_nfs_mount): """ Tests that adoc template get copied to working directory """ + from em_workflows.brt import flow as brt_flow + with tempfile.TemporaryDirectory() as tmp_dir: - utils.copy_template(working_dir=tmp_dir, template_name="plastic_brt") - utils.copy_template(working_dir=tmp_dir, template_name="cryo_brt") + brt_flow.copy_template(working_dir=tmp_dir, template_name="plastic_brt") + brt_flow.copy_template(working_dir=tmp_dir, template_name="cryo_brt") tmp_path = Path(tmp_dir) assert tmp_path.exists() assert Path(tmp_path / "plastic_brt.adoc").exists() @@ -299,9 +307,11 @@ def test_copy_template_missing(mock_nfs_mount): """ Tests that adoc template get copied to working directory """ + from em_workflows.brt import flow as brt_flow + with tempfile.TemporaryDirectory() as tmp_dir: with pytest.raises(FileNotFoundError) as fnfe: - utils.copy_template(working_dir=tmp_dir, template_name="no_such_tmplt") + brt_flow.copy_template(working_dir=tmp_dir, template_name="no_such_tmplt") assert "no_such_tmplt" in str(fnfe.value)