diff --git a/em_workflows/brt/flow.py b/em_workflows/brt/flow.py index 159598de..5624bf4b 100644 --- a/em_workflows/brt/flow.py +++ b/em_workflows/brt/flow.py @@ -39,7 +39,6 @@ import os from em_workflows.file_path import FilePath import subprocess -import re import math from pathlib import Path @@ -127,10 +126,10 @@ def gen_dimension_command(file_path: FilePath, ali_or_rec: str) -> str: stderr = sp.stderr.decode("utf-8") msg = f"Command ok : {stderr} -- {stdout}" utils.log(msg) - xyz_dim = re.split(" +(\d+)", stdout) - z_dim = xyz_dim[5] + xyz_dim = [int(x) for x in stdout.split()] + z_dim = xyz_dim[2] utils.log(f"z_dim: {z_dim:}") - return z_dim + return str(z_dim) @task @@ -531,12 +530,12 @@ def cleanup_files(file_path: FilePath, pattern=str): tilt_movie_assets = gen_tilt_movie.map( file_path=fps, upstream_tasks=[keyimg_assets] ) - cleanup_files.map( + clean_align_mrc = cleanup_files.map( file_path=fps, pattern=unmapped("*_align_*.mrc"), upstream_tasks=[tilt_movie_assets, thumb_assets, keyimg_assets], ) - cleanup_files.map( + clean_ali_jpg = cleanup_files.map( file_path=fps, pattern=unmapped("*ali*.jpg"), upstream_tasks=[tilt_movie_assets, thumb_assets, keyimg_assets], @@ -557,12 +556,12 @@ def cleanup_files(file_path: FilePath, pattern=str): file_path=fps, upstream_tasks=[averagedVolume_assets] ) recon_movie_assets = gen_recon_movie.map(file_path=fps, upstream_tasks=[ave_jpgs]) - cleanup_files.map( + clean_mp4 = cleanup_files.map( file_path=fps, pattern=unmapped("*_mp4.*.jpg"), upstream_tasks=[recon_movie_assets, ave_jpgs], ) - cleanup_files.map( + clean_ave_mrc = cleanup_files.map( file_path=fps, pattern=unmapped("*_ave*.mrc"), upstream_tasks=[recon_movie_assets, ave_jpgs], @@ -615,13 +614,23 @@ def cleanup_files(file_path: FilePath, pattern=str): cp_wd_to_assets = utils.copy_workdirs.map( fps, upstream_tasks=[callback_with_tilt_mov] ) - rm_workdirs = utils.cleanup_workdir.map(fps, upstream_tasks=[cp_wd_to_assets]) # finally filter error states, and convert to JSON and send. filtered_callback = utils.filter_results(callback_with_tilt_mov) cb = utils.send_callback_body( token=token, callback_url=callback_url, files_elts=filtered_callback ) + rm_workdirs = utils.cleanup_workdir( + fps, + upstream_tasks=[ + cb, + cp_wd_to_assets, + clean_align_mrc, + clean_ali_jpg, + clean_ave_mrc, + clean_mp4, + ], + ) # the other tasks might be always run or something, # this is far enough along to get an idea of success. diff --git a/em_workflows/config.py b/em_workflows/config.py index 2b477793..08faf608 100644 --- a/em_workflows/config.py +++ b/em_workflows/config.py @@ -15,14 +15,15 @@ def SLURM_exec(): name="dask-worker", cores=60, memory="32G", - # processes=1, + processes=1, death_timeout=121, local_directory="/gs1/home/macmenaminpe/tmp/", queue="gpu", walltime="4:00:00", job_extra_directives=["--gres=gpu:1"], ) - cluster.adapt(minimum=1, maximum=6) + cluster.scale(1) + # cluster.adapt(minimum=1, maximum=6) logging = prefect.context.get("logger") logging.debug("Dask cluster started") logging.debug(f"see dashboard {cluster.dashboard_link}") diff --git a/em_workflows/dm_conversion/flow.py b/em_workflows/dm_conversion/flow.py index 723812d6..e1e99f8c 100644 --- a/em_workflows/dm_conversion/flow.py +++ b/em_workflows/dm_conversion/flow.py @@ -266,9 +266,7 @@ def scale_jpegs(file_path: FilePath, size: str) -> Optional[dict]: prim_fp=callback_with_thumbs, asset=keyimg_assets ) # finally filter error states, and convert to JSON and send. - rm_workdirs = utils.cleanup_workdir.map( - fp=fps, upstream_tasks=[callback_with_keyimgs] - ) + rm_workdirs = utils.cleanup_workdir(fps, upstream_tasks=[callback_with_keyimgs]) filtered_callback = utils.filter_results(callback_with_keyimgs) callback_sent = utils.send_callback_body( diff --git a/em_workflows/file_path.py b/em_workflows/file_path.py index b820a5e2..dfc9fff6 100644 --- a/em_workflows/file_path.py +++ b/em_workflows/file_path.py @@ -1,5 +1,4 @@ import datetime -import glob import shutil import os from typing import List, Dict @@ -142,10 +141,10 @@ def copy_to_assets_dir(self, fp_to_cp: Path) -> Path: if fp_to_cp.is_dir(): if dest.exists(): shutil.rmtree(dest) - shutil.copytree(fp_to_cp, dest) + d = shutil.copytree(fp_to_cp, dest) else: - shutil.copyfile(fp_to_cp, dest) - return dest + d = shutil.copyfile(fp_to_cp, dest) + return Path(d) # def add_assets_entry( # self, asset_path: Path, asset_type: str, metadata: Dict[str, str] = None @@ -261,10 +260,6 @@ def copy_workdir_to_assets(self) -> Path: dest = Path( f"{self.assets_dir.as_posix()}/{dir_name_as_date}/{self.fp_in.stem}" ) - existing_workdirs = glob.glob(f"{self.assets_dir.as_posix()}/work_dir_*") - for _dir in existing_workdirs: - log(f"Trying to remove old workdir {_dir}") - shutil.rmtree(_dir) if dest.exists(): log(f"Output assets directory already exists! removing: {dest}") shutil.rmtree(dest) @@ -274,7 +269,7 @@ def copy_workdir_to_assets(self) -> Path: def rm_workdir(self): """Removes the the entire working directory""" log(f"Removing working dir: {self.working_dir}") - shutil.rmtree(self.working_dir) + shutil.rmtree(self.working_dir, ignore_errors=True) @staticmethod def run(cmd: List[str], log_file: str) -> int: diff --git a/em_workflows/sem_tomo/flow.py b/em_workflows/sem_tomo/flow.py index 7b1185bc..2ca35c77 100644 --- a/em_workflows/sem_tomo/flow.py +++ b/em_workflows/sem_tomo/flow.py @@ -289,9 +289,7 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict: callback_with_corr_movies = utils.add_asset.map( prim_fp=callback_with_corr_mrcs, asset=corrected_movie_assets ) - rm_workdirs = utils.cleanup_workdir.map( - fp=fps, upstream_tasks=[callback_with_corr_movies] - ) + rm_workdirs = utils.cleanup_workdir(fps, upstream_tasks=[callback_with_corr_movies]) # finally filter error states, and convert to JSON and send. filtered_callback = utils.filter_results(callback_with_corr_movies) diff --git a/em_workflows/utils/utils.py b/em_workflows/utils/utils.py index 404cba39..0be103dd 100644 --- a/em_workflows/utils/utils.py +++ b/em_workflows/utils/utils.py @@ -150,7 +150,7 @@ def add_asset(prim_fp: dict, asset: dict) -> dict: @task(max_retries=3, retry_delay=datetime.timedelta(seconds=10), trigger=always_run) -def cleanup_workdir(fp: FilePath): +def cleanup_workdir(fps: List[FilePath]): """ :param fp: a FilePath which has a working_dir to be removed @@ -158,11 +158,12 @@ def cleanup_workdir(fp: FilePath): | task wrapper on the FilePath rm_workdir method. """ - if context.parameters["keep_workdir"] is not True: - log(f"Trying to remove {fp.working_dir}") - fp.rm_workdir() - else: + if prefect.context.parameters.get("keep_workdir") is True: log("keep_workdir is set to True, skipping removal.") + else: + for fp in fps: + log(f"Trying to remove {fp.working_dir}") + fp.rm_workdir() # @task