diff --git a/brats_toolkit/preprocessor.py b/brats_toolkit/preprocessor.py index 1d3c9d7..2c81647 100644 --- a/brats_toolkit/preprocessor.py +++ b/brats_toolkit/preprocessor.py @@ -13,45 +13,69 @@ from brats_toolkit.util.prep_utils import tempFiler -class Preprocessor(object): +class Preprocessor: + """ + Class for preprocessing medical imaging data. + """ + @citation_reminder @deprecated_preprocessor - def __init__(self, noDocker=False): + def __init__(self, noDocker: bool = False): + """ + Initialize the Preprocessor instance. + + Parameters: + - noDocker (bool): Flag indicating whether Docker is used. + """ # settings - self.clientVersion = "0.0.1" - self.confirmationRequired = True - self.mode = "cpu" - self.gpuid = "0" + self.clientVersion: str = "0.0.1" + self.confirmationRequired: bool = True + self.mode: str = "cpu" + self.gpuid: str = "0" # init sio client - self.sio = socketio.Client() + self.sio: socketio.Client = socketio.Client() # set docker usage - self.noDocker = noDocker + self.noDocker: bool = noDocker @self.sio.event - def connect(): + def connect() -> None: + """ + Event handler for successful connection. + """ print("connection established! sid:", self.sio.sid) - # client identification self.sio.emit( "clientidentification", {"brats_cli": self.clientVersion, "proc_mode": self.mode}, ) @self.sio.event - def connect_error(): + def connect_error() -> None: + """ + Event handler for connection error. + """ print("The connection failed!") @self.sio.event - def disconnect(): + def disconnect() -> None: + """ + Event handler for disconnection. + """ print("disconnected from server") @self.sio.on("message") - def message(data): + def message(data: dict) -> None: + """ + Event handler for incoming message. + """ print("message", data) @self.sio.on("status") - def on_status(data): + def on_status(data: dict) -> None: + """ + Event handler for status update. + """ print("status received: ", data) if data["message"] == "client ID json generation finished!": self._inspect_input() @@ -77,7 +101,10 @@ def on_status(data): sys.exit(0) @self.sio.on("client_outdated") - def outdated(data): + def outdated(data: dict) -> None: + """ + Event handler for outdated client version. + """ print( "Your client version", self.clientVersion, @@ -91,33 +118,53 @@ def outdated(data): sys.exit(0) @self.sio.on("ipstatus") - def on_ipstatus(data): + def on_ipstatus(data: dict) -> None: + """ + Event handler for image processing status. + """ print("image processing status received:") print(data["examid"], ": ", data["ipstatus"]) def single_preprocess( self, - t1File, - t1cFile, - t2File, - flaFile, - outputFolder, - mode, - confirm=False, - skipUpdate=False, - gpuid="0", - ): + t1File: str, + t1cFile: str, + t2File: str, + flaFile: str, + outputFolder: str, + mode: str, + confirm: bool = False, + skipUpdate: bool = False, + gpuid: str = "0", + ) -> None: + """ + Process a single set of input files. + + Parameters: + - t1File (str): Path to T1 file. + - t1cFile (str): Path to T1c file. + - t2File (str): Path to T2 file. + - flaFile (str): Path to FLAIR file. + - outputFolder (str): Output folder path. + - mode (str): Processing mode (e.g., "cpu", "gpu"). + - confirm (bool): Whether confirmation is required. + - skipUpdate (bool): Whether to skip Docker update. + - gpuid (str): GPU ID. + + Returns: + - None + """ # assign name to file print("basename:", os.path.basename(outputFolder)) - outputPath = Path(outputFolder) - dockerOutputFolder = os.path.abspath(outputPath.parent) + outputPath: Path = Path(outputFolder) + dockerOutputFolder: str = os.path.abspath(outputPath.parent) # create temp dir - storage = tempfile.TemporaryDirectory() + storage: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory() # TODO this is a potential security hazzard as all users can access the files now, but currently it seems the only way to deal with bad configured docker installations os.chmod(storage.name, 0o777) - dockerFolder = os.path.abspath(storage.name) - tempFolder = os.path.join(dockerFolder, os.path.basename(outputFolder)) + dockerFolder: str = os.path.abspath(storage.name) + tempFolder: str = os.path.join(dockerFolder, os.path.basename(outputFolder)) os.makedirs(tempFolder, exist_ok=True) print("tempFold:", tempFolder) @@ -139,15 +186,31 @@ def single_preprocess( def batch_preprocess( self, - exam_import_folder=None, - exam_export_folder=None, - dicom_import_folder=None, - nifti_export_folder=None, - mode="cpu", - confirm=True, - skipUpdate=False, - gpuid="0", - ): + exam_import_folder: str, + exam_export_folder: str, + dicom_import_folder: str = None, + nifti_export_folder: str = None, + mode: str = "cpu", + confirm: bool = True, + skipUpdate: bool = False, + gpuid: str = "0", + ) -> None: + """ + Process multiple sets of input files, potentially using Docker. + + Parameters: + - exam_import_folder (str): Import folder path. + - exam_export_folder (str): Export folder path. + - dicom_import_folder (Optional[str]): DICOM import folder path. + - nifti_export_folder (Optional[str]): NIfTI export folder path. + - mode (str): Processing mode (e.g., "cpu", "gpu"). + - confirm (bool): Whether confirmation is required. + - skipUpdate (bool): Whether to skip Docker update. + - gpuid (str): GPU ID. + + Returns: + - None + """ if confirm != True: self.confirmationRequired = False self.mode = mode @@ -172,14 +235,23 @@ def batch_preprocess( self._connect_client() self.sio.wait() - def _connect_client(self): + def _connect_client(self) -> None: + """ + Connect to the server using SocketIO. + """ self.sio.connect("http://localhost:5000") print("sid:", self.sio.sid) - def _inspect_input(self): + def _inspect_input(self) -> None: + """ + Send input inspection request to the server. + """ print("sending input inspection request!") self.sio.emit("input_inspection", {"hurray": "yes"}) - def _process_start(self): + def _process_start(self) -> None: + """ + Send processing request to the server. + """ print("sending processing request!") self.sio.emit("brats_processing", {"hurray": "yes"})