Skip to content

Commit

Permalink
Refactor czi workflow for prefect2
Browse files Browse the repository at this point in the history
  • Loading branch information
annshress committed Oct 12, 2023
1 parent ed4562e commit 68a2744
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
37 changes: 20 additions & 17 deletions em_workflows/czi/flow.txt → em_workflows/czi/flow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from pathlib import Path
from typing import List, Dict

import SimpleITK as sitk
from prefect import Flow, Parameter, task
from prefect import flow, task, allow_failure
from pytools.HedwigZarrImage import HedwigZarrImage
from pytools.HedwigZarrImages import HedwigZarrImages

from em_workflows.file_path import FilePath
from em_workflows.utils import utils
from em_workflows.utils import neuroglancer as ng
from prefect.run_configs import LocalRun
from em_workflows.czi.constants import (
VALID_CZI_INPUTS,
THUMB_X_DIM,
Expand Down Expand Up @@ -107,20 +108,22 @@ def find_thumb_idx(callback: List[Dict]) -> List[Dict]:
return callback


with Flow(
"czi_to_zarr",
state_handlers=[utils.notify_api_completion, utils.notify_api_running],
executor=CZIConfig.SLURM_EXECUTOR,
run_config=LocalRun(labels=[utils.get_environment()]),
) as flow:
input_dir = Parameter("input_dir")
file_share = Parameter("file_share")
file_name = Parameter("file_name", default=None)
callback_url = Parameter("callback_url", default=None)()
token = Parameter("token", default=None)()
no_api = Parameter("no_api", default=None)()
# keep workdir if set true, useful to look at outputs
keep_workdir = Parameter("keep_workdir", default=False)()
@flow(
name="Flow: IF czi",
log_prints=True,
task_runner=CZIConfig.SLURM_EXECUTOR,
# on_completion=utils.notify_api_completion,
# on_failure=utils.notify_api_completion,
)
def czi_flow(
file_share: str,
input_dir: str,
file_name: str = None,
callback_url: str = None,
token: str = None,
no_api: bool = False,
keep_workdir: bool = False,
):
input_dir_fp = utils.get_input_dir(share_name=file_share, input_dir=input_dir)

input_fps = utils.list_files(
Expand All @@ -137,4 +140,4 @@ def find_thumb_idx(callback: List[Dict]) -> List[Dict]:
cb = utils.send_callback_body(
token=token, callback_url=callback_url, files_elts=filtered_callback
)
rm_workdirs = utils.cleanup_workdir(fps, upstream_tasks=[cb])
utils.cleanup_workdir(fps, wait_for=[allow_failure(cb)])
12 changes: 6 additions & 6 deletions test/test_czi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
@pytest.mark.slow
@pytest.mark.localdata
def test_input_fname(mock_nfs_mount, caplog, mock_reuse_zarr):
from em_workflows.czi.flow import flow
from em_workflows.czi.flow import czi_flow

state = flow.run(
state = czi_flow(
file_share="test",
input_dir="test/input_files/IF_czi/Projects/smaller",
no_api=True,
Expand Down Expand Up @@ -41,14 +41,14 @@ def test_no_mount_point_flow_fails(mock_binaries, monkeypatch, caplog):
If mounted path doesn't exist should fail the flow immediately
"""
from em_workflows import config
from em_workflows.czi.flow import flow
from em_workflows.czi.flow import czi_flow

share_name = "INVALID"
_mock_NFS_MOUNT = {share_name: "/tmp/non-existent-path"}

monkeypatch.setattr(config, "NFS_MOUNT", _mock_NFS_MOUNT)

state = flow.run(
state = czi_flow(
file_share=share_name,
input_dir="test/input_files/IF_czi/Projects/smaller",
no_api=True,
Expand All @@ -64,13 +64,13 @@ def test_czi_workflow_callback_structure(
Tests that appropriate structure exists in the callback output
Tests that there is no duplication in the callback output
"""
from em_workflows.czi.flow import flow
from em_workflows.czi.flow import czi_flow

input_dir = "test/input_files/IF_czi/Projects/smaller"
if not Path(input_dir).exists():
pytest.skip("Missing input files")

state = flow.run(
state = czi_flow(
file_share="test",
input_dir=input_dir,
no_api=True,
Expand Down

0 comments on commit 68a2744

Please sign in to comment.