diff --git a/core/lls_core/lattice_data.py b/core/lls_core/lattice_data.py index f568462d..3ec8bbb8 100644 --- a/core/lls_core/lattice_data.py +++ b/core/lls_core/lattice_data.py @@ -27,6 +27,7 @@ from lls_core.types import ArrayLike from napari_workflows import Workflow from napari.types import ShapesData +from xarray import DataArray if TYPE_CHECKING: import pyclesperanto_prototype as cle @@ -75,7 +76,7 @@ def save_image(self): for roi, roi_results in groupby(self.slices, key=lambda it: it.roi_index): if self.lattice_data.save_type == SaveFileType.h5: bdv_writer = npy2bdv.BdvWriter( - make_filename_prefix(prefix=self.lattice_data.save_name, roi_index=roi), + make_filename_prefix(prefix=self.lattice_data.save_name, roi_index=roi) + ".h5", compression='gzip', nchannels=len(self.lattice_data.channel_range), subsamp=((1, 1, 1), (1, 2, 2), (2, 4, 4)), @@ -180,7 +181,7 @@ def default_time_range(cls, v: Any, values: dict) -> range: Sets the default time range if undefined """ if v is None: - return range(values["dims"].T + 1) + return range(values["data"].sizes["T"] + 1) return v @validator("channel_range") @@ -189,20 +190,12 @@ def default_channel_range(cls, v: Any, values: dict) -> range: Sets the default channel range if undefined """ if v is None: - return range(values["dims"].C + 1) + return range(values["data"].sizes["C"] + 1) return v class DeskewParams(DefaultMixin, arbitrary_types_allowed=True): #: A 3-5D array containing the image data - data: ArrayLike - - #: Dimensions of `data` - dims: Dimensions - - #: Dimensions of the deskewed output - deskew_vol_shape: Tuple[int, ...] = Field(init_var=False) - - deskew_affine_transform: cle.AffineTransform3D = Field(init_var=False) + data: DataArray #: Geometry of the light path skew: DeskewDirection = DeskewDirection.Y @@ -211,17 +204,42 @@ class DeskewParams(DefaultMixin, arbitrary_types_allowed=True): #: Pixel size in microns physical_pixel_sizes: DefinedPixelSizes = Field(default_factory=DefinedPixelSizes) + #: Dimensions of the deskewed output + deskew_vol_shape: Tuple[int, ...] = Field(init_var=False, default=None) + + deskew_affine_transform: cle.AffineTransform3D = Field(init_var=False, default=None) + + @property + def dims(self): + return self.data.dims + + @validator("data", pre=True) + def reshaping(cls, v: Any): + # This allows a user to pass in any array-like object and have it + # converted and reshaped appropriately + array = DataArray(v) + if not set(array.dims).issuperset({"X", "Y", "Z"}): + raise ValueError("The input array must at least have XYZ coordinates") + if "T" not in array.dims: + array = array.expand_dims("T") + if "C" not in array.dims: + array = array.expand_dims("C") + return array.transpose("T", "C", "Z", "Y", "X") + + def get_3d_slice(self) -> DataArray: + return self.data.sel(C=0, T=0) + @root_validator(pre=True) def set_deskew(cls, values: dict) -> dict: """ Sets the default deskew shape values if the user has not provided them """ # process the file to get shape of final deskewed image - data = values["data"] + data: DataArray = cls.reshaping(values["data"]) if values.get('deskew_vol_shape') is None: if values.get('deskew_affine_transform') is None: # If neither has been set, calculate them ourselves - values["deskew_vol_shape"], values["deskew_affine_transform"] = get_deskewed_shape(values["data"], values["angle"], values["physical_pixel_sizes"].X, values["physical_pixel_sizes"].Y, values["physical_pixel_sizes"].Z, values["skew"]) + values["deskew_vol_shape"], values["deskew_affine_transform"] = get_deskewed_shape(data.sel(C=0, T=0).to_numpy(), values["angle"], values["physical_pixel_sizes"].X, values["physical_pixel_sizes"].Y, values["physical_pixel_sizes"].Z, values["skew"]) else: raise ValueError("deskew_vol_shape and deskew_affine_transform must be either both specified or neither specified") return values @@ -249,7 +267,7 @@ def disjoint_time_range(cls, v: range, values: dict): """ Validates that the time range is within the range of channels in our array """ - max_time = values["dims"].T + max_time = values["data"].sizes["T"] if v.start < 0: raise ValueError("The lowest valid start value is 0") if v.stop > max_time: @@ -261,7 +279,7 @@ def disjoint_channel_range(cls, v: range, values: dict): """ Validates that the channel range is within the range of channels in our array """ - max_channel = values["dims"].T + max_channel = values["data"].sizes["C"] if v.start < 0: raise ValueError("The lowest valid start value is 0") if v.stop > max_channel: @@ -270,13 +288,15 @@ def disjoint_channel_range(cls, v: range, values: dict): @validator("channel_range") def channel_range_subset(cls, v: range, values: dict): - if min(v) < 0 or max(v) > values["dims"].C: + if min(v) < 0 or max(v) > values["data"].sizes["C"]: raise ValueError("The output channel range must be a subset of the total available channels") + return v @validator("time_range") def time_range_subset(cls, v: range, values: dict): - if min(v) < 0 or max(v) > values["dims"].T: - raise ValueError("The output time range must be a subset of the total available time points") + if min(v) < 0 or max(v) > values["data"].sizes["T"]: + raise ValueError("The output time range must be a subset of the total available time points") + return v # Hack to ensure that .skew_dir behaves identically to .skew @property @@ -343,12 +363,12 @@ def deconv_enabled(self) -> bool: @property def time(self) -> int: """Number of time points""" - return self.dims.T + return self.data.sizes["T"] @property def channels(self) -> int: """Number of channels""" - return self.dims.C + return self.data.sizes["C"] @property def new_dz(self): @@ -358,17 +378,19 @@ def __post_init__(self): logger.info(f"Channels: {self.channels}, Time: {self.time}") logger.info("If channel and time need to be swapped, you can enforce this by choosing 'Last dimension is channel' when initialising the plugin") - def slice_data(self, time: int, channel: int) -> ArrayLike: + def slice_data(self, time: int, channel: int) -> DataArray: if time > self.time: raise ValueError("time is out of range") if channel > self.channels: raise ValueError("channel is out of range") - if len(self.dims.shape) == 3: + return self.data.sel(T=time, C=channel) + + if len(self.data.shape) == 3: return self.data - elif len(self.dims.shape) == 4: + elif len(self.data.shape) == 4: return self.data[time, :, :, :] - elif len(self.dims.shape) == 5: + elif len(self.data.shape) == 5: return self.data[time, channel, :, :, :] raise Exception("Lattice data must be 3-5 dimensions") @@ -511,8 +533,7 @@ def process(self) -> ProcessedSlices: ) class AicsLatticeParams(TypedDict): - data: DaskArray - dims: Dimensions + data: DataArray physical_pixel_sizes: DefinedPixelSizes def lattice_params_from_aics(img: AICSImage, physical_pixel_sizes: PhysicalPixelSizes = PhysicalPixelSizes(None, None, None)) -> AicsLatticeParams: diff --git a/plugin/napari_lattice/dock_widget.py b/plugin/napari_lattice/dock_widget.py index 4bba9e39..398827f9 100644 --- a/plugin/napari_lattice/dock_widget.py +++ b/plugin/napari_lattice/dock_widget.py @@ -1,80 +1,32 @@ -import os -import sys -import yaml -import numpy as np +# Enable Logging +import logging from pathlib import Path -import dask.array as da -import pandas as pd -from typing import Any, Callable, Iterable, Literal, Optional, Sequence, Tuple, Union, List, TypeVar, Type, cast -from enum import Enum, auto +from typing import Union +import numpy as np +from lls_core.lattice_data import LatticeData +from lls_core.workflow import import_workflow_modules +from magicclass import MagicTemplate, field, magicclass, set_options from magicclass.wrappers import set_design -from magicgui.widgets import Widget -from magicclass import magicclass, field, vfield, set_options, MagicTemplate, FieldGroup, Icon -from magicclass.fields import MagicValueField, MagicField -from magicclass.widgets import CollapsibleContainer, CheckBox, RangeSlider, ComboBox -from magicclass.widgets.containers import ContainerWidget, _VCollapsibleContainer, wrap_container -from magicclass.utils import click -from qtpy.QtCore import Qt -from qtpy.QtGui import QIcon -from qtpy.QtWidgets import QTabWidget - -from napari.layers import Layer, Shapes -from napari.types import ImageData from napari import Viewer -from napari_lattice.icons import RED, GREEN, GREY - -from tqdm import tqdm - +from napari.layers import Shapes +from napari_lattice.fields import ( + CroppingFields, + DeconvolutionFields, + DeskewFields, + OutputFields, + WorkflowFields, +) +from napari_lattice.icons import GREY from napari_workflows import Workflow, WorkflowManager from napari_workflows._io_yaml_v1 import load_workflow - -from lls_core import DeconvolutionChoice, SaveFileType, Log_Levels, DeskewDirection -from lls_core.lattice_data import CropParams, DeconvolutionParams, DefinedPixelSizes, LatticeData - -from pydantic import ValidationError -from napari_lattice.reader import lattice_params_from_napari -from napari_lattice.fields import DeskewFields, CroppingFields, DeconvolutionFields, WorkflowFields, OutputFields, exception_to_html -from napari_lattice.icons import GREY - -from strenum import StrEnum - -from lls_core.workflow import import_workflow_modules -# Enable Logging -import logging +from qtpy.QtCore import Qt +from qtpy.QtGui import QIcon +from qtpy.QtWidgets import QTabWidget logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -# MagicClassType = TypeVar("MagicClassType") -# def magicclass(*args, **kwargs) -> Callable[[MagicClassType], MagicClassType]: -# return original_magicclass(*args, **kwargs) - -# class HideableContents(MagicTemplate): -# # fields_enabled = vfield(False, label="Enabled") - -# # @fields_enabled.connect -# def _enabled_changed(self) -> None: -# if self.fields_enabled: -# logger.info(f"{self.__class__} Activated") -# for child in self.__magicclass_children__: -# child.visible = True -# child.enabled = True -# else: -# logger.info(f"{self.__class__} Deactivated") -# for child in self.__magicclass_children__: -# child.visible = False -# child.enabled = False - - -# def validate_tab(parent: MagicTemplate, fields: FieldGroup, index: int): -# tab_widget: QTabWidget = parent._widget._tab_widget -# try: -# fields._make_model() -# tab_widget.setTabIcon(index, QIcon(GREEN)) -# except ValidationError: -# tab_widget.setTabIcon(index, QIcon(RED)) - class LlszTemplate(MagicTemplate): @property def llsz_parent(self) -> "LLSZWidget": @@ -86,7 +38,6 @@ def parent_viewer(mc: MagicTemplate) -> Viewer: raise Exception("This function can only be used when inside of a Napari viewer") return mc.parent_viewer - @magicclass(widget_type="split") class LLSZWidget(LlszTemplate): open_file: bool = False @@ -103,16 +54,19 @@ def _check_validity(self) -> bool: return False def _make_model(self) -> LatticeData: - # deskew = self.LlszMenu.WidgetContainer.DeskewWidget - # output = self.LlszMenu.WidgetContainer.OutputWidget - # deconv = self.LlszMenu.WidgetContainer.DeconvolutionWidget - # crop = self.LlszMenu.WidgetContainer.CroppingWidget - # workflow = self.LlszMenu.WidgetContainer.WorkflowWidget - - # TODO: fix + deskew_args = self.LlszMenu.WidgetContainer.deskew_fields._get_kwargs() + output_args = self.LlszMenu.WidgetContainer.output_fields._make_model() return LatticeData( - **self.LlszMenu.WidgetContainer.deskew_fields._make_model().dict(), - **self.LlszMenu.WidgetContainer.output_fields._make_model().dict(), + data=deskew_args["data"], + angle=deskew_args["angle"], + channel_range=output_args.channel_range, + time_range=output_args.time_range, + save_dir=output_args.save_dir, + # We let the user specify a prefix, but if they don't, we can use the default + save_name=output_args.save_name or deskew_args["save_name"] , + save_type=output_args.save_type, + physical_pixel_sizes=deskew_args["physical_pixel_sizes"], + skew=deskew_args["skew"], workflow=self.LlszMenu.WidgetContainer.workflow_fields._make_model(), deconvolution=self.LlszMenu.WidgetContainer.deconv_fields._make_model(), crop=self.LlszMenu.WidgetContainer.cropping_fields._make_model() @@ -123,9 +77,6 @@ class LlszMenu(LlszTemplate): main_heading = field("

Napari Lattice: Visualization & Analysis

", widget_type="Label") heading1 = field("Drag and drop an image file onto napari.", widget_type="Label") - # Pycudadecon library for deconvolution - # options={"enabled": True}, - # Tabbed Widget container to house all the widgets @magicclass(widget_type="tabbed", name="Functions", labels=False) @@ -139,255 +90,10 @@ def __post_init__(self): field._validate() deskew_fields = DeskewFields(name = "1. Deskew") - # @deskew_fields.connect - # def _deskew_changed(self): - # validate_tab(self, self.deskew_fields, 0) - deconv_fields = DeconvolutionFields(name = "2. Deconvolution") - # @deconv_fields.connect - # def _deconv_changed(self): - # validate_tab(self, self.deconv_fields, 1) - cropping_fields = CroppingFields(name = "3. Crop") - # @cropping_fields.connect - # def _cropping_changed(self): - # validate_tab(self, 2) - workflow_fields = WorkflowFields(name = "4. Workflow") - # @workflow_fields.connect - # def _workflow_changed(self): - # validate_tab(self, 3) - output_fields = OutputFields(name = "5. Output") - # @output_fields.connect - # def _output_changed(self): - # validate_tab(self, 4) - - # @magicclass(name="1. Deskew") - # class DeskewWidget(MagicTemplate): - # img_layer = field(List[Layer]).with_options(label = "Image Layer", layout="vertical", value=[]) - # pixel_sizes = field(Tuple[float, float, float]).with_options( - # value=(DefinedPixelSizes.get_default("X"), DefinedPixelSizes.get_default("Y"), DefinedPixelSizes.get_default("Z")), - # label="Pixel Sizes (XYZ)" - # ) - # angle = field(LatticeData.get_default("angle")).with_options(value=LatticeData.get_default("angle"), label="Skew Angle") - # device = field(str).with_choices(cle.available_device_names()).with_options(label="Graphics Device") - # merge_all_channels = field(False).with_options(label="Merge all Channels") - # dimension_order = field(str).with_options(value=LastDimensionOptions.Metadata.value).with_choices([it.value for it in LastDimensionOptions]).with_options(label="Dimension Order") - # skew_dir = field(DeskewDirection.Y).with_options(label = "Skew Direction") - - # def _img_changed(self): - # deskew = get_child(self, self.DeskewWidget) - # output = get_child(self, self.OutputWidget) - # output.channel_range.options["max"] = deskew.img_layer.value._dims_displayed - ### - # Deconvolution - ### - # deconvolution = vfield(bool, name="Use Deconvolution").with_options(value=False) - # deconv_widget = DeconvolutionWidget() - # deconv_widget.enabled = False - # deconv_widget.visible = False - - # @deconvolution.connect - # def _set_decon(self) -> None: - # if self.deconvolution: - # logger.info("Deconvolution Activated") - # # Enable deconvolutio by using the saved parameters - # # self.llsz_parent.lattice.deconvolution = self.llsz_parent.deconv - # self.deconv_widget.enabled = True - # self.deconv_widget.visible = True - # else: - # logger.info("Deconvolution Disabled") - # # self.llsz_parent.lattice.deconvolution = None - # self.deconv_widget.enabled = False - # self.deconv_widget.visible = False - - # ### - # # Cropping - # ### - - # cropping_enabled = vfield(bool, name="Use Cropping").with_options(value=False) - # crop_widget = CroppingWidget() - # crop_widget.enabled = False - # crop_widget.visible = False - - # @cropping_enabled.connect - # def _set_crop(self) -> None: - # if self.cropping_enabled: - # logger.info("Cropping Activated") - # # Enable deconvolutio by using the saved parameters - # # self.llsz_parent.lattice.deconvolution = self.llsz_parent.deconv - # self.crop_widget.enabled = True - # self.crop_widget.visible = True - # else: - # logger.info("Deconvolution Disabled") - # # self.llsz_parent.lattice.deconvolution = None - # self.crop_widget.enabled = False - # self.crop_widget.visible = False - - # ### - # # Workflow - # ### - # workflow_enabled = vfield(bool, name="Use Workflow").with_options(value=False) - # workflow_widget = WorkflowWidget() - # workflow_widget.enabled = False - # workflow_widget.visible = False - - # @workflow_enabled.connect - # def _set_workflow(self) -> None: - # if self.workflow_enabled: - # logger.info("Workflow Activated") - # # Enable deconvolutio by using the saved parameters - # # self.llsz_parent.lattice.deconvolution = self.llsz_parent.deconv - # self.workflow_widget.enabled = True - # self.workflow_widget.visible = True - # else: - # logger.info("Deconvolution Disabled") - # # self.llsz_parent.lattice.deconvolution = None - # self.workflow_widget.enabled = False - # self.workflow_widget.visible = False - - # @set_design(background_color="magenta", font_family="Consolas", visible=True, text="Initialize Plugin", max_height=75, font_size=13) - # @set_options(pixel_size_dx={"widget_type": "FloatSpinBox", "value": 0.1449922, "step": 0.000000001}, - # pixel_size_dy={"widget_type": "FloatSpinBox", - # "value": 0.1449922, "step": 0.000000001}, - # pixel_size_dz={"widget_type": "FloatSpinBox", - # "value": 0.3, "step": 0.000000001}, - # angle={"widget_type": "FloatSpinBox", - # "value": 30, "step": 0.1}, - # select_device={"widget_type": "ComboBox", "choices": cle.available_device_names( - # ), "value": cle.available_device_names()[0]}, - # last_dimension_channel={"widget_type": "ComboBox", - # "label": "Set Last dimension (channel/time)", "tooltip": "If the last dimension is initialised incorrectly, you can assign it as either channel/time"}, - # merge_all_channel_layers={"widget_type": "CheckBox", "value": True, "label": "Merge all napari layers as channels", - # "tooltip": "Use this option if the channels are in separate layers. napari-lattice requires all channels to be in same layer"}, - # skew_dir={"widget_type": "ComboBox", "choices": DeskewDirection, "value": DeskewDirection.Y, - # "label": "Direction of skew (Y or X)", "tooltip": "Skew direction when image is acquired. Ask your microscopist for details"}, - # set_logging={"widget_type": "ComboBox", "choices": Log_Levels, "value": Log_Levels.INFO, - # "label": "Log Level", "tooltip": "Only use for debugging. Leave it as INFO for regular operation"} - # ) - # def Choose_Image_Layer(self, - # img_layer: Layer, - # pixel_size_dx: float = 0.1449922, - # pixel_size_dy: float = 0.1449922, - # pixel_size_dz: float = 0.3, - # angle: float = 30, - # select_device: str = cle.available_device_names()[ - # 0], - # last_dimension_channel: LastDimensionOptions = "", - # merge_all_channel_layers: bool = False, - # skew_dir: DeskewDirection=DeskewDirection.Y, - # set_logging: Log_Levels=Log_Levels.INFO): - - # logger.setLevel(set_logging.value) - # config.log_level = set_logging.value - # logger.info(f"Logging set to {set_logging}") - # logger.info("Using existing image layer") - - # # Select device for processing - # cle.select_device(select_device) - - # # merge all napari image layers as one multidimensional image - # if merge_all_channel_layers: - # from napari.layers.utils.stack_utils import images_to_stack - # # get list of napari layers as a list - # layer_list = list(self.parent_viewer.layers) - # # if more than one layer - # if len(layer_list) > 1: - # # convert the list of images into a stack - # new_layer = images_to_stack(layer_list) - # # select all current layers - # self.parent_viewer.layers.select_all() - # # remove selected layers - # self.parent_viewer.layers.remove_selected() - # # add the new composite image layer - # self.parent_viewer.add_layer(new_layer) - # img_layer = new_layer - - # self.llsz_parent.lattice = lattice_from_napari( - # img=img_layer, - # last_dimension=None if last_dimension_channel == LastDimensionOptions.get_from_metadata else last_dimension_channel, - # angle=angle, - # skew=skew_dir, - # physical_pixel_sizes=(pixel_size_dx, pixel_size_dy, pixel_size_dz), - # # deconvolution = DeconvolutionParams() - # ) - # # flag for ensuring a file has been opened and plugin initialised - # self.llsz_parent.open_file = True - - # logger.info( - # f"Pixel size (ZYX) in microns: {self.llsz_parent.lattice.dz,self.llsz_parent.lattice.dy,self.llsz_parent.lattice.dx}") - # logger.info( - # f"Dimensions of image layer (ZYX): {list(self.llsz_parent.lattice.data.shape[-3:])}") - # logger.info( - # f"Dimensions of deskewed image (ZYX): {self.llsz_parent.lattice.deskew_vol_shape}") - # logger.info( - # f"Deskewing angle is: {self.llsz_parent.lattice.angle}") - # logger.info( - # f"Deskew Direction: {self.llsz_parent.lattice.skew}") - # # Add dimension labels correctly - # # if channel, and not time - # if self.llsz_parent.lattice.time == 0 and (last_dimension_channel or self.llsz_parent.lattice.channels > 0): - # self.parent_viewer.dims.axis_labels = ('Channel', "Z", "Y", "X") - # # if no channel, but has time - # elif self.llsz_parent.lattice.channels == 0 and self.llsz_parent.lattice.time > 0: - # self.parent_viewer.dims.axis_labels = ('Time', "Z", "Y", "X") - # # if it has channels - # elif self.llsz_parent.lattice.channels > 1: - # # If merge to stack is used, channel slider goes to the bottom - # if int(self.parent_viewer.dims.dict()["range"][0][1]) == self.llsz_parent.lattice.channels: - # self.parent_viewer.dims.axis_labels = ('Channel', "Time", "Z", "Y", "X") - # else: - # self.parent_viewer.dims.axis_labels = ('Time', "Channel", "Z", "Y", "X") - # # if channels initialized by aicsimagio, then channels is 1 - # elif self.llsz_parent.lattice.channels == 1 and self.llsz_parent.lattice.time > 1: - # self.parent_viewer.dims.axis_labels = ('Time', "Z", "Y", "X") - - # logger.info(f"Initialised") - # self["Choose_Image_Layer"].background_color = "green" - # self["Choose_Image_Layer"].text = "Plugin Initialised" - - - # @set_design(background_color="magenta", font_family="Consolas", visible=True, text="Click to select PSFs for deconvolution", max_height=75, font_size=11) - # @set_options(header=dict(widget_type="Label", label="

Enter path to the PSF images

"), - # psf_ch1_path={"widget_type": "FileEdit", - # "label": "Channel 1:"}, - # psf_ch2_path={"widget_type": "FileEdit", - # "label": "Channel 2"}, - # psf_ch3_path={"widget_type": "FileEdit", - # "label": "Channel 3"}, - # psf_ch4_path={"widget_type": "FileEdit", - # "label": "Channel 4"}, - # device_option={ - # "widget_type": "ComboBox", "label": "Choose processing device", "choices": DeconvolutionChoice}, - # no_iter={ - # "widget_type": "SpinBox", "label": "No of iterations (Deconvolution)", "value": 10, "min": 1, "max": 50, "step": 1} - # ) - # def deconvolution_gui(self, - # header: str, - # psf_ch1_path: Path, - # psf_ch2_path: Path, - # psf_ch3_path: Path, - # psf_ch4_path: Path, - # device_option: DeconvolutionChoice, - # no_iter: int): - # """GUI for Deconvolution button""" - # # Force deconvolution to be true if we do this - # if not self.llsz_parent.lattice.deconvolution: - # raise Exception("Deconvolution is set to False. Tick the box to activate deconvolution.") - # self.llsz_parent.deconv.decon_processing = device_option - # self.llsz_parent.deconv.psf = list(read_psf([ - # psf_ch1_path, - # psf_ch2_path, - # psf_ch3_path, - # psf_ch4_path, - # ], - # device_option, - # lattice_class=self.llsz_parent.lattice - # )) - # self.llsz_parent.deconv.psf_num_iter = no_iter - # self["deconvolution_gui"].background_color = "green" - # self["deconvolution_gui"].text = "PSFs added" @set_options(header=dict(widget_type="Label", label="

Preview Deskew

"), time=dict(label="Time:", max=2**15), @@ -415,622 +121,8 @@ def preview(self, header:str, time: int, channel: int): @set_design(text="Save") def save(self): - self._make_model_friendly() - - - # @magicclass(widget_type="collapsible") - # class Preview: - # @magicgui(header=dict(widget_type="Label", label="

Preview Deskew

"), - # time=dict(label="Time:", max=2**15), - # channel=dict(label="Channel:"), - # call_button="Preview") - # def Preview_Deskew(self, - # header: str, - # time: int, - # channel: int, - # img_data: ImageData): - # """ - # Preview deskewed data for a single timepoint and channel - - # """ - # _Preview(LLSZWidget, - # self, - # time, - # channel, - # img_data) - - # Tabbed Widget container to house all the widgets - # @magicclass(widget_type="tabbed", name="Functions") - # class WidgetContainer(LlszTemplate): - - # @magicclass(name="Deskew", widget_type="scrollable", properties={"min_width": 100}) - # class DeskewWidget(LlszTemplate): - - # @magicgui(header=dict(widget_type="Label", label="

Deskew and Save

"), - # time_start=dict(label="Time Start:", max=2**20), - # time_end=dict(label="Time End:", value=1, max=2**20), - # ch_start=dict(label="Channel Start:"), - # ch_end=dict(label="Channel End:", value=1), - # save_as_type={ - # "label": "Save as filetype:", "choices": SaveFileType, "value": SaveFileType.h5}, - # save_path=dict(mode='d', label="Directory to save"), - # call_button="Save") - # def Deskew_Save(self, - # header: str, - # time_start: int, - # time_end: int, - # ch_start: int, - # ch_end: int, - # save_as_type: str, - # save_path: Path = Path(history.get_save_history()[0])): - # """ Widget to Deskew and Save Data""" - # _Deskew_Save(LLSZWidget, - # time_start, - # time_end, - # ch_start, - # ch_end, - # save_as_type, - # save_path) - - # @magicclass(name="Crop and Deskew", widget_type="scrollable") - # class CropWidget(LlszTemplate): - - # # add function for previewing cropped image - # @magicclass(name="Cropping Preview", widget_type="scrollable", properties={ - # "min_width": 100, - # "shapes_layer": Shapes - # }) - # class Preview_Crop_Menu(LlszTemplate): - - # @set_design(font_size=10, text="Click to activate Cropping Layer", background_color="magenta") - # @click(enables=["Import_ImageJ_ROI", "Crop_Preview"]) - # def activate_cropping(self): - # self.llsz_parent.shapes_layer = self.parent_viewer.add_shapes(shape_type='polygon', edge_width=1, edge_color='white', - # face_color=[1, 1, 1, 0], name="Cropping BBOX layer") - # # TO select ROIs if needed - # self.llsz_parent.shapes_layer.mode = "SELECT" - # self["activate_cropping"].text = "Cropping layer active" - # self["activate_cropping"].background_color = "green" - - # heading2 = field("You can either import ImageJ ROI (.zip) files or manually define ROIs using the shape layer", widget_type="Label") - - # @click(enabled=False) - # def Import_ImageJ_ROI(self, path: Path = Path(history.get_open_history()[0])) -> None: - # logger.info(f"Opening{path}") - # roi_list = read_imagej_roi(str(path)) - # # convert to canvas coordinates - # self.find_ancestor(LLSZWidget) - # roi_list = (np.array(roi_list) * self.llsz_parent.lattice.dy).tolist() - # self.llsz_parent.shapes_layer.add(roi_list, shape_type='polygon', edge_width=1, edge_color='yellow', face_color=[1, 1, 1, 0]) - - # time_crop = field(int, options={"min": 0, "step": 1, "max": 2**20}, name="Time") - # chan_crop = field(int, options={"min": 0, "step": 1}, name="Channels") - # heading_roi = field("If there are multiple ROIs, select the ROI before clicking button below", widget_type="Label") - # #roi_idx = field(int, options={"min": 0, "step": 1}, name="ROI number") - - # @click(enabled=False) - # def Crop_Preview(self, roi_layer: ShapesData): - - # if not roi_layer: - # raise Exception("No coordinates found for cropping. Check if right shapes layer or initialise shapes layer and draw ROIs.") - # # TODO: Add assertion to check if bbox layer or coordinates - - # # Slice out the image of interest to preview - # time = self.time_crop.value - # channel = self.chan_crop.value - # if time >= self.llsz_parent.lattice.time: - # raise ValueError("Time is out of range") - # if time >= self.llsz_parent.lattice.time: - # raise ValueError("Channel is out of range") - # logger.info(f"Using channel {channel} and time {time}") - - # # if only one roi drawn, use the first ROI for cropping - # if len(self.llsz_parent.shapes_layer.selected_data) == 0: - # raise Exception("Please select an ROI") - - # roi_idx = list(self.llsz_parent.shapes_layer.selected_data)[0] - - # # As the original image is scaled, the coordinates are in microns, so we need to convert - # # roi from micron to canvas/world coordinates - # roi_choice = [x/self.llsz_parent.lattice.dy for x in roi_layer[roi_idx]] - # logger.info(f"Previewing ROI {roi_idx}") - - # # crop - - # # Set the deconvolution options - # if self.llsz_parent.deconvolution: - # if not self.llsz_parent.lattice.psf or not self.llsz_parent.lattice.psf_num_iter or not self.llsz_parent.lattice.decon_processing: - # raise Exception( - # "PSF fields should be set by this point!") - # logger.info( - # f"Deskewing for Time:{time} and Channel: {channel} with deconvolution") - # decon_kwargs = dict( - # decon_processing=self.llsz_parent.lattice.decon_processing.value, - # psf=self.llsz_parent.lattice.psf[channel], - # num_iter=self.llsz_parent.lattice.psf_num_iter - # ) - # else: - # decon_kwargs = dict() - - # crop_roi_vol_desk = cle.pull( - # crop_volume_deskew( - # original_volume=np.array(self.llsz_parent.lattice.data[time, channel, ...]), - # roi_shape=roi_choice, - # angle_in_degrees=self.llsz_parent.angle_value, - # voxel_size_x=self.llsz_parent.lattice.dx, - # voxel_size_y=self.llsz_parent.lattice.dy, - # voxel_size_z=self.llsz_parent.lattice.dz, - # deconvolution=self.llsz_parent.deconvolution, - # # Option for entering custom z start value? - # z_start=0, - # z_end=self.llsz_parent.lattice.deskew_vol_shape[0], - # skew_dir=self.llsz_parent.skew_dir, - # **decon_kwargs - # ).astype(self.llsz_parent.lattice.data.dtype) - # ) - - # # get array back from gpu or addding cle array to napari can throw errors - # image = next(self.llsz_parent.lattice.process()) - - # scale = ( - # self.llsz_parent.lattice.new_dz, - # self.llsz_parent.lattice.dy, - # self.llsz_parent.lattice.dx - # ) - # self.parent_viewer.add_image( - # crop_roi_vol_desk, scale=scale) - - # @magicclass(name="Crop and Save Data") - # class CropSaveData(LlszTemplate): - # @magicgui(header=dict(widget_type="Label", label="

Crop and Save Data

"), - # time_start=dict(label="Time Start:"), - # time_end=dict(label="Time End:", value=1), - # ch_start=dict(label="Channel Start:"), - # ch_end=dict(label="Channel End:", value=1), - # save_as_type={ - # "label": "Save as filetype:", "choices": SaveFileType}, - # save_path=dict(mode='d', label="Directory to save ")) - # def Crop_Save(self, - # header: str, - # time_start: int, - # time_end: int, - # ch_start: int, - # ch_end: int, - # save_as_type: SaveFileType, - # roi_layer_list: ShapesData, - # save_path: Path = Path(history.get_save_history()[0])): - - # if not roi_layer_list: - # logger.error( - # "No coordinates found or cropping. Initialise shapes layer and draw ROIs.") - # else: - # if not self.llsz_parent.open_file: - # raise Exception("Image not initialised") - - # check_dimensions(time_start, time_end, ch_start, ch_end, self.llsz_parent.lattice.channels, self.llsz_parent.lattice.time) - - # angle = self.llsz_parent.lattice.angle - # dx = self.llsz_parent.lattice.dx - # dy = self.llsz_parent.lattice.dy - # dz = self.llsz_parent.lattice.dz - - # # get image data - # img_data = self.llsz_parent.lattice.data - # # Get shape of deskewed image - # deskewed_shape = self.llsz_parent.lattice.deskew_vol_shape - # deskewed_volume = da.zeros(deskewed_shape) - # z_start = 0 - # z_end = deskewed_shape[0] - - # logger.info("Cropping and saving files...") - - # # necessary when scale is used for napari.viewer.add_image operations - # roi_layer_list = ShapesData([x/self.llsz_parent.lattice.dy for x in roi_layer_list]) - - # for idx, roi_layer in enumerate(tqdm(roi_layer_list, desc="ROI:", position=0)): - # # pass arguments for save tiff, callable and function arguments - # logger.info("Processing ROI ", idx) - # # pass parameters for the crop_volume_deskew function - - # save_img(vol=img_data, - # func=crop_volume_deskew, - # time_start=time_start, - # time_end=time_end, - # channel_start=ch_start, - # channel_end=ch_end, - # save_name_prefix="ROI_" + str(idx), - # save_path=save_path, - # save_file_type=save_as_type, - # save_name=self.llsz_parent.lattice.save_name, - # dx=dx, - # dy=dy, - # dz=dz, - # angle=angle, - # deskewed_volume=deskewed_volume, - # roi_shape=roi_layer, - # angle_in_degrees=angle, - # z_start=z_start, - # z_end=z_end, - # voxel_size_x=dx, - # voxel_size_y=dy, - # voxel_size_z=dz, - # LLSZWidget=self.llsz_parent - # ) - - # logger.info( - # f"Cropping and Saving Complete -> {save_path}") - - # @magicclass(name="Workflow", widget_type="scrollable") - # class WorkflowWidget(LlszTemplate): - - # @magicclass(name="Preview Workflow", widget_type="scrollable") - # class PreviewWorkflow(LlszTemplate): - # #time_preview= field(int, options={"min": 0, "step": 1}, name="Time") - # #chan_preview = field(int, options={"min": 0, "step": 1}, name="Channels") - # @magicgui(header=dict(widget_type="Label", label="

Preview Workflow

"), - # time_preview=dict(label="Time:", max=2**20), - # chan_preview=dict(label="Channel:"), - # get_active_workflow=dict( - # widget_type="Checkbox", label="Get active workflow in napari-workflow", value=False), - # workflow_path=dict( - # mode='r', label="Load custom workflow (.yaml/yml)"), - # Use_Cropping=dict( - # widget_type="Checkbox", label="Crop Data", value=False), - # #custom_module=dict(widget_type="Checkbox",label="Load custom module (looks for *.py files in the workflow directory)",value = False), - # call_button="Apply and Preview Workflow") - # def Workflow_Preview(self, - # header: str, - # time_preview: int, - # chan_preview: int, - # get_active_workflow: bool, - # Use_Cropping: bool, - # roi_layer_list: ShapesData, - # workflow_path: Path = Path.home()): - # """ - # Apply napari_workflows to the processing pipeline - # User can define a pipeline which can be inspected in napari workflow inspector - # and then execute it by ticking the get active workflow checkbox, - # OR - # Use a predefined workflow - - # In both cases, if deskewing is not present as first step, it will be added on - # and rest of the task will be made followers - # """ - # logger.info("Previewing deskewed channel and time with workflow") - # user_workflow = get_workflow(self.parent_viewer if get_active_workflow else workflow_path) - - # # when using fields, self.time_preview.value - # if time_preview >= self.llsz_parent.lattice.time: - # raise ValueError("Time is out of range") - # if chan_preview >= self.llsz_parent.lattice.channels: - # raise ValueError("Channel is out of range") - - # time = time_preview - # channel = chan_preview - - # # to access current time and channel and pass it to workflow file - # config.channel = channel - # config.time = time - - # logger.info(f"Processing for Time: {time} and Channel: {channel}") - - # logger.info("Workflow to be executed:") - # logger.info(user_workflow) - # # Execute workflow - # processed_vol = user_workflow.get(task_name_last) - - # # check if a measurement table (usually a dictionary or list) or a tuple with different data types - # # The function below saves the tables and adds any images to napari window - # if type(processed_vol) in [dict, list, tuple]: - # if (len(processed_vol) > 1): - # df = pd.DataFrame() - # for idx, i in enumerate(processed_vol): - # df_temp = process_custom_workflow_output( - # i, parent_dir, idx, LLSZWidget, self, channel, time, preview=True) - # final_df = pd.concat([df, df_temp]) - # # append dataframes from every loop and have table command outside loop? - # # TODO: Figure out why table is not displaying - # from napari_spreadsheet import _widget - # table_viewer = _widget.TableViewerWidget( - # show=True) - # table_viewer.add_spreadsheet(final_df) - # # widgets.Table(value=final_df).show() - - # else: - # # add image to napari window - # # TODO: check if its an image napari supports? - # process_custom_workflow_output( - # processed_vol, parent_dir, 0, LLSZWidget, self, channel, time) - - # print("Workflow complete") - - # @magicgui(header=dict(widget_type="Label", label="

Apply Workflow and Save Output

"), - # time_start=dict(label="Time Start:", max=2**20), - # time_end=dict(label="Time End:", - # value=1, max=2**20), - # ch_start=dict(label="Channel Start:"), - # ch_end=dict(label="Channel End:", value=1), - # Use_Cropping=dict( - # widget_type="Checkbox", label="Crop Data", value=False), - # get_active_workflow=dict( - # widget_type="Checkbox", label="Get active workflow in napari-workflow", value=False), - # workflow_path=dict( - # mode='r', label="Load custom workflow (.yaml/yml)"), - # save_as_type={ - # "label": "Save as filetype:", "choices": SaveFileType}, - # save_path=dict( - # mode='d', label="Directory to save "), - # #custom_module=dict(widget_type="Checkbox",label="Load custom module (same dir as workflow)",value = False), - # call_button="Apply Workflow and Save Result") - # def Apply_Workflow_and_Save(self, - # header: str, - # time_start: int, - # time_end: int, - # ch_start: int, - # ch_end: int, - # Use_Cropping: bool, - # roi_layer_list: ShapesData, - # get_active_workflow: bool = False, - # workflow_path: Path = Path.home(), - # save_as_type: str = SaveFileType.tiff, - # save_path: Path = Path(history.get_save_history()[0])): - # """ - # Apply a user-defined analysis workflow using napari-workflows - - # Args: - # time_start (int): Start Time - # time_end (int): End Time - # ch_start (int): Start Channel - # ch_end (int): End Channel - # Use_Cropping (_type_): Use cropping based on ROIs in the shapes layer - # roi_layer_list (ShapesData): Shapes layer to use for cropping; can be a list of shapes - # get_active_workflow (bool, optional): Gets active workflow in napari. Defaults to False. - # workflow_path (Path, optional): User can also choose a custom workflow defined in a yaml file. - # save_path (Path, optional): Path to save resulting data - # """ - # if not self.llsz_parent.open_file: - # raise Exception("Image not initialised") - - # check_dimensions(time_start, time_end, ch_start, ch_end, - # self.llsz_parent.lattice.channels, self.llsz_parent.lattice.time) - - # # Get parameters - # angle = self.llsz_parent.lattice.angle - # dx = self.llsz_parent.lattice.dx - # dy = self.llsz_parent.lattice.dy - # dz = self.llsz_parent.lattice.dz - - # user_workflow = get_workflow(self.parent_viewer if get_active_workflow else workflow_path) - - # input_arg_first, input_arg_last, first_task_name, last_task_name = get_first_last_image_and_task( - # user_workflow) - # logger.info(f"{input_arg_first=}, {input_arg_last=}, {first_task_name=}, {last_task_name=}") - # logger.info(f"Workflow loaded: {user_workflow}") - - # vol = self.llsz_parent.lattice.data - # task_name_start = first_task_name[0] - - # try: - # task_name_last = last_task_name[0] - # except IndexError: - # task_name_last = task_name_start - - # # variables to hold task name, initialize it as None - # # if gpu, set otf_path, otherwise use psf - # psf = None - # otf_path = None - - # if self.llsz_parent.lattice.decon_processing == DeconvolutionChoice.cuda_gpu: - # #otf_path = "otf_path" - # psf_arg = "psf" - # psf = self.llsz_parent.lattice.psf - # else: - # psf_arg = "psf" - # psf = self.llsz_parent.lattice.psf - # # if cropping, set that as first task - - # if Use_Cropping: - # # convert Roi pixel coordinates to canvas coordinates - # # necessary only when scale is used for napari.viewer.add_image operations - # roi_layer_list = [x/self.llsz_parent.lattice.dy for x in roi_layer_list] - - # deskewed_shape = self.llsz_parent.lattice.deskew_vol_shape - # deskewed_volume = da.zeros(deskewed_shape) - # z_start = 0 - # z_end = deskewed_shape[0] - # roi = "roi" - # volume = "volume" - # # Check if decon ticked, if so set as first and crop as second? - - # # Create workflow for cropping and deskewing - # # volume and roi used will be set dynamically - # user_workflow.set("crop_deskew_image", crop_volume_deskew, - # original_volume=volume, - # deskewed_volume=deskewed_volume, - # roi_shape=roi, - # angle_in_degrees=angle, - # voxel_size_x=dx, - # voxel_size_y=dy, - # voxel_size_z=dz, - # z_start=z_start, - # z_end=z_end, - # deconvolution=self.llsz_parent.deconvolution.value, - # decon_processing=self.llsz_parent.lattice.decon_processing, - # psf=psf_arg, - # skew_dir=self.llsz_parent.skew_dir) - - # # change the first task so it accepts "crop_deskew as input" - # new_task = modify_workflow_task( - # old_arg=input_arg_first, task_key=task_name_start, new_arg="crop_deskew_image", workflow=user_workflow) - # user_workflow.set(task_name_start, new_task) - - # for idx, roi_layer in enumerate(tqdm(roi_layer_list, desc="ROI:", position=0)): - # print("Processing ROI ", idx) - # user_workflow.set(roi, roi_layer) - # save_img_workflow(vol=vol, - # workflow=user_workflow, - # input_arg=volume, - # first_task="crop_deskew_image", - # last_task=task_name_last, - # time_start=time_start, - # time_end=time_end, - # channel_start=ch_start, - # channel_end=ch_end, - # save_file_type=save_as_type, - # save_path=save_path, - # #roi_layer = roi_layer, - # save_name_prefix="ROI_" + \ - # str(idx), - # save_name=self.llsz_parent.lattice.save_name, - # dx=dx, - # dy=dy, - # dz=dz, - # angle=angle, - # deconvolution=self.llsz_parent.deconvolution.value, - # decon_processing=self.llsz_parent.lattice.decon_processing, - # otf_path=otf_path, - # psf_arg=psf_arg, - # psf=psf) - - # # IF just deskewing and its not in the tasks, add that as first task - # elif user_workflow.get_task(task_name_start)[0] not in (cle.deskew_y, cle.deskew_x): - # input = "input" - # # add task to the workflow - # user_workflow.set("deskew_image", - # self.llsz_parent.deskew_func, - # input_image=input, - # angle_in_degrees=angle, - # voxel_size_x=dx, - # voxel_size_y=dy, - # voxel_size_z=dz, - # linear_interpolation=True) - # # Set input of the workflow to be from deskewing - # # change workflow task starts from is "deskew_image" and - # new_task = modify_workflow_task( - # old_arg=input_arg_first, task_key=task_name_start, new_arg="deskew_image", workflow=user_workflow) - # user_workflow.set(task_name_start, new_task) - - # # if deconvolution checked, add it to start of workflow (add upstream of deskewing) - # if self.llsz_parent.deconvolution: - # psf = "psf" - # otf_path = "otf_path" - # input_arg_first, input_arg_last, first_task_name, last_task_name = get_first_last_image_and_task( - # user_workflow) - - # if self.llsz_parent.lattice.decon_processing == DeconvolutionChoice.cuda_gpu: - # user_workflow.set("deconvolution", - # pycuda_decon, - # image=input, - # psf=psf_arg, - # dzdata=self.llsz_parent.lattice.dz, - # dxdata=self.llsz_parent.lattice.dx, - # dzpsf=self.llsz_parent.lattice.dz, - # dxpsf=self.llsz_parent.lattice.dx, - # num_iter=self.llsz_parent.lattice.psf_num_iter) - # # user_workflow.set(input_arg_first,"deconvolution") - # else: - # user_workflow.set("deconvolution", - # skimage_decon, - # vol_zyx=input, - # psf=psf_arg, - # num_iter=self.llsz_parent.lattice.psf_num_iter, - # clip=False, - # filter_epsilon=0, - # boundary='nearest') - # # modify the user workflow so "deconvolution" is accepted - # new_task = modify_workflow_task( - # old_arg=input_arg_first, task_key=task_name_start, new_arg="deconvolution", workflow=user_workflow) - # user_workflow.set(task_name_start, new_task) - # input_arg_first, input_arg_last, first_task_name, last_task_name = get_first_last_image_and_task( - # user_workflow) - # task_name_start = first_task_name[0] - - # save_img_workflow(vol=vol, - # workflow=user_workflow, - # input_arg=input, - # first_task=task_name_start, - # last_task=task_name_last, - # time_start=time_start, - # time_end=time_end, - # channel_start=ch_start, - # channel_end=ch_end, - # save_file_type=save_as_type, - # save_path=save_path, - # save_name=self.llsz_parent.lattice.save_name, - # dx=dx, - # dy=dy, - # dz=dz, - # angle=angle, - # deconvolution=self.llsz_parent.deconvolution, - # decon_processing=self.llsz_parent.lattice.decon_processing, - # otf_path=otf_path, - # psf_arg=psf_arg, - # psf=psf) - - # # If deskewing is already as a task, then set the first argument to input so we can modify that later - # else: - # # if deskewing is already first task, then check if deconvolution needed - # # if deconvolution checked, add it to start of workflow (add upstream of deskewing) - # if self.llsz_parent.deconvolution: - # psf = "psf" - # otf_path = "otf_path" - # input_arg_first, input_arg_last, first_task_name, last_task_name = get_first_last_image_and_task( - # user_workflow) - - # if self.llsz_parent.lattice.decon_processing == DeconvolutionChoice.cuda_gpu: - # user_workflow.set("deconvolution", - # pycuda_decon, - # image=input, - # psf=psf_arg, - # dzdata=self.llsz_parent.lattice.dz, - # dxdata=self.llsz_parent.lattice.dx, - # dzpsf=self.llsz_parent.lattice.dz, - # dxpsf=self.llsz_parent.lattice.dx, - # num_iter=self.llsz_parent.lattice.psf_num_iter) - # # user_workflow.set(input_arg_first,"deconvolution") - # else: - # user_workflow.set("deconvolution", - # skimage_decon, - # vol_zyx=input, - # psf=psf_arg, - # num_iter=self.llsz_parent.lattice.psf_num_iter, - # clip=False, - # filter_epsilon=0, - # boundary='nearest') - # # modify the user workflow so "deconvolution" is accepted - # new_task = modify_workflow_task( - # old_arg=input_arg_first, task_key=task_name_start, new_arg="deconvolution", workflow=user_workflow) - # user_workflow.set(task_name_start, new_task) - # input_arg_first, input_arg_last, first_task_name, last_task_name = get_first_last_image_and_task( - # user_workflow) - # task_name_start = first_task_name[0] - - # # we pass first argument as input - # save_img_workflow(vol=vol, - # workflow=user_workflow, - # input_arg=input_arg_first, - # first_task=task_name_start, - # last_task=task_name_last, - # time_start=time_start, - # time_end=time_end, - # channel_start=ch_start, - # channel_end=ch_end, - # save_file_type=save_as_type, - # save_path=save_path, - # save_name=self.llsz_parent.lattice.save_name, - # dx=dx, - # dy=dy, - # dz=dz, - # angle=angle, - # deconvolution=self.llsz_parent.deconvolution, - # decon_processing=self.llsz_parent.lattice.decon_processing, - # otf_path=otf_path, - # psf_arg=psf_arg, - # psf=psf) - - # print("Workflow complete") - # return - + lattice = self._make_model() + lattice.process().save_image() def _napari_lattice_widget_wrapper() -> LLSZWidget: # split widget type enables a resizable widget diff --git a/plugin/napari_lattice/fields.py b/plugin/napari_lattice/fields.py index b69deac1..f95251cf 100644 --- a/plugin/napari_lattice/fields.py +++ b/plugin/napari_lattice/fields.py @@ -1,38 +1,38 @@ -from __future__ import annotations -from pathlib import Path -from magicclass import FieldGroup, field, MagicTemplate -from magicclass.widgets import Widget, ComboBox, Label, Select -from magicclass.fields import MagicField -from typing import Any, Callable, List, Optional, Protocol, Tuple, TypeVar, Union, cast, TYPE_CHECKING -from typing_extensions import Protocol, Self -from pydantic import BaseModel, ValidationError +# FieldGroups that the users interact with to input data -from strenum import StrEnum -from enum import auto -from lls_core import DeconvolutionChoice, SaveFileType, Log_Levels, DeskewDirection -from lls_core.lattice_data import CropParams, DeconvolutionParams, DefinedPixelSizes, LatticeData, OutputParams, DeskewParams -from napari.layers import Shapes +import logging from enum import Enum +from pathlib import Path +from typing import Any, Callable, List, Optional, Tuple, TypeVar, cast + import pyclesperanto_prototype as cle -from napari_workflows import Workflow, WorkflowManager -from napari.types import ImageData, ShapesData +from lls_core import ( + DeconvolutionChoice, + DeskewDirection, + Log_Levels, + SaveFileType, +) +from lls_core.lattice_data import ( + CropParams, + DeconvolutionParams, + DefinedPixelSizes, + DeskewParams, + LatticeData, + OutputParams, +) +from magicclass import FieldGroup, MagicTemplate, field +from magicclass.fields import MagicField +from magicclass.widgets import ComboBox, Label, Widget +from napari.layers import Image, Shapes +from napari.types import ShapesData from napari.utils import history -from abc import ABC +from napari_lattice.icons import GREEN, GREY, RED +from napari_lattice.reader import NapariImageParams, lattice_params_from_napari +from napari_lattice.utils import get_layers +from napari_workflows import Workflow, WorkflowManager +from pydantic import ValidationError from qtpy.QtWidgets import QTabWidget - -from napari_lattice.icons import RED, GREEN, GREY -from napari_lattice.reader import lattice_params_from_napari -from napari_lattice.utils import get_viewer, get_layers - -from napari.layers import Image - -if TYPE_CHECKING: - from xarray import DataArray - - -# FieldGroups that the users interact with to input data - -import logging +from strenum import StrEnum logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -137,17 +137,6 @@ class LastDimensionOptions(Enum): XYZCT = "XYZCT" Metadata = "Get from Metadata" -# class NapariFields(FieldGroup, ABC): -# def __init__(self, layout: str = "vertical", labels: bool = False, name: str | None = None, **kwargs): -# super().__init__(layout, labels, name, **kwargs) - -# class NapariFieldGroupCompatible(Protocol): -# from magicclass.widgets import Container -# from qtpy.QtWidgets import QWidget -# errors: MagicField[Label] -# parent: QWidget -# _widget: Container - class NapariFieldGroup: # This implementation is a bit ugly. This is a mixin that can only be used on a `FieldGroup`. # However, it can't inherit from FieldGroup because then the metaclass would look for fields in this @@ -204,6 +193,10 @@ def _validate(self: Any): def _make_model(self): raise NotImplementedError() +class DeskewKwargs(NapariImageParams): + angle: float + skew: DeskewDirection + class DeskewFields(NapariFieldGroup, FieldGroup): def _get_dimension_options(self, _) -> List[str]: @@ -267,8 +260,8 @@ def _get_dimension_options(self, _) -> List[str]: def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) - from qtpy.QtWidgets import QDoubleSpinBox from magicgui.widgets import TupleEdit + from qtpy.QtWidgets import QDoubleSpinBox # Enormous hack to set the precision # A better method has been requested here: https://github.com/pyapp-kit/magicgui/issues/581#issuecomment-1709467219 @@ -287,34 +280,21 @@ def __init__(self, *args: Any, **kwargs: Any): @img_layer.connect def _img_changed(self) -> None: - # Recalculate the dimension options + # Recalculate the dimension options whenever the image changes self.dimension_order.reset_choices() @img_layer.connect @enable_if([stack_along]) def _hide_stack_along(self): + # Hide the "Stack Along" option if we only have one image return len(self.img_layer.value) > 1 - def _merge_layers(self) -> DataArray: + def _get_kwargs(self) -> DeskewKwargs: """ - Returns a single image array merged from all the selected layers + Returns the LatticeData fields that the Deskew tab can provide """ - from xarray import DataArray, concat - layers = [DataArray(it, dims=self.dimension_order.value.split()) for it in self.img_layer.value] - if len(layers) == 0: - raise Exception("At least one image layer must be selected") - elif len(layers) == 1: - return layers[0] - else: - dim = "C" if self.stack_along.value == StackAlong.CHANNEL else "T" - return concat(layers, dim=dim) - - # if len(self.img_layer.value) == 0: - # raise Exception("At least one image layer must be selected.") - # return images_to_stack(self.img_layer.value) - - def _make_model(self) -> DeskewParams: from aicsimageio.types import PhysicalPixelSizes + DeskewParams.update_forward_refs() params = lattice_params_from_napari( imgs=self.img_layer.value, dimension_order=None if self.dimension_order.value == "Get from Metadata" else self.dimension_order.value, @@ -325,14 +305,21 @@ def _make_model(self) -> DeskewParams: ), stack_along="C" if self.stack_along.value == StackAlong.CHANNEL else "T" ) - return DeskewParams( - data=params["data"], - dims=params["dims"], - physical_pixel_sizes=params["physical_pixel_sizes"], + return DeskewKwargs( + **params, angle=self.angle.value, skew = self.skew_dir.value, ) + def _make_model(self) -> DeskewParams: + kwargs = self._get_kwargs() + return DeskewParams( + data=kwargs["data"], + physical_pixel_sizes=kwargs["physical_pixel_sizes"], + angle=kwargs["angle"], + skew = kwargs["skew"] + ) + class DeconvolutionFields(NapariFieldGroup, FieldGroup): """ A counterpart to the DeconvolutionParams Pydantic class @@ -350,11 +337,6 @@ class DeconvolutionFields(NapariFieldGroup, FieldGroup): ) errors = field(Label).with_options(label="Errors") - - # @background.connect - # def _show_custom_background(self): - # self.background_custom.visible = self.background == BackgroundSource.Custom - @background.connect @enable_if( [background_custom] @@ -362,10 +344,6 @@ class DeconvolutionFields(NapariFieldGroup, FieldGroup): def _enable_custom_background(self) -> bool: return self.background.value == BackgroundSource.Custom - # @fields_enabled.connect - # def _enable_fields(self) -> bool: - # self.decon_processing.visible = self.fields_enabled - @fields_enabled.connect @enable_if( fields = [ @@ -414,32 +392,6 @@ class CroppingFields(NapariFieldGroup, FieldGroup): def _enable_workflow(self) -> bool: return self.fields_enabled.value - - # roi_layer_list: ShapesData - # @magicclass(visible=False) - # class Fields(MagicTemplate): - # shapes= vfield(ShapesData, label = "ROIs")#Optional[Shapes] = None - # z_range = vfield(Tuple[int, int]).with_options( - # label = "Z Range", - # value = (0, 1), - # options = dict( - # min = 0, - # max = 1 - # ), - # ) - # _shapes_layer: Optional[Shapes] = None - - # @set_design(font_size=10, text="Click to activate Cropping Layer", background_color="magenta") - # @click(enables=["Import_ImageJ_ROI", "Crop_Preview"]) - # @set_design(text="New Cropping Layer") - # def activate_cropping(self): - # self._shapes_layer = self.parent_viewer.add_shapes(shape_type='polygon', edge_width=1, edge_color='white', - # face_color=[1, 1, 1, 0], name="Cropping BBOX layer") - # # TO select ROIs if needed - # self._shapes_layer.mode = "SELECT" - # self["activate_cropping"].text = "Cropping layer active" - # self["activate_cropping"].background_color = "green" - def _make_model(self) -> Optional[CropParams]: import numpy as np if self.fields_enabled.value: @@ -519,6 +471,3 @@ def _make_model(self) -> OutputParams: save_name=self.save_name.value, save_type=self.save_type.value ) - - -# @DeskewWidget.img_layer.connect diff --git a/plugin/napari_lattice/reader.py b/plugin/napari_lattice/reader.py index 1c7cee27..dac3452b 100644 --- a/plugin/napari_lattice/reader.py +++ b/plugin/napari_lattice/reader.py @@ -41,6 +41,10 @@ def lattice_params_from_napari( if len(imgs) < 1: raise ValueError("At least one image must be provided.") + if len(set(len(it.data.shape) for it in imgs)) > 1: + size_message = ",".join(f"{img.name}: {len(img.data.shape)}" for img in imgs) + raise ValueError(f"The input images have multiple different dimensions, which napari lattice doesn't support: {size_message}") + save_name: str pixel_sizes: set[PhysicalPixelSizes] = {physical_pixel_sizes} save_names = [] @@ -74,11 +78,11 @@ def lattice_params_from_napari( # else: # pixel_size_metadata = img_data_aics.physical_pixel_sizes - calculated_order = img_data_aics.dims.order + calculated_order = tuple(img_data_aics.dims.order) elif dimension_order is None: raise ValueError("Either the Napari image must have dimensional metadata, or a dimension order must be provided") else: - calculated_order = list(dimension_order) + calculated_order = tuple(dimension_order) final_imgs.append(DataArray(img.data, dims=calculated_order))