diff --git a/brats/core/brats_algorithm.py b/brats/core/brats_algorithm.py index faa4af4..f2571b2 100644 --- a/brats/core/brats_algorithm.py +++ b/brats/core/brats_algorithm.py @@ -8,7 +8,7 @@ from loguru import logger -from brats.core.docker import run_docker +from brats.core.docker import run_container from brats.utils.algorithm_config import load_algorithms from brats.utils.constants import OUTPUT_NAME_SCHEMA, Algorithms, Task from brats.utils.data_handling import InferenceSetup @@ -129,7 +129,7 @@ def _infer_single( inputs=inputs, ) - run_docker( + run_container( algorithm=self.algorithm, data_path=tmp_data_folder, output_path=tmp_output_folder, @@ -172,7 +172,7 @@ def _infer_batch( logger.info(f"Standardized input names to match algorithm requirements.") # run inference in container - run_docker( + run_container( algorithm=self.algorithm, data_path=tmp_data_folder, output_path=tmp_output_folder, diff --git a/brats/core/docker.py b/brats/core/docker.py index f068466..a60aa7c 100644 --- a/brats/core/docker.py +++ b/brats/core/docker.py @@ -1,7 +1,6 @@ from __future__ import annotations import os -import shutil import subprocess import time from pathlib import Path @@ -19,7 +18,7 @@ AlgorithmNotCPUCompatibleException, BraTSContainerException, ) -from brats.utils.zenodo import check_model_weights, get_dummy_weights_path +from brats.utils.zenodo import check_additional_files_path, get_dummy_path try: client = docker.from_env() @@ -41,7 +40,7 @@ def _show_docker_pull_progress(tasks: Dict, progress: Progress, line: Dict): if line["status"] == "Downloading": task_key = f'[Download {line["id"]}]' elif line["status"] == "Extracting": - task_key = f'[Extract {line["id"]}]' + task_key = f'[Extract {line["id"]}]' else: return @@ -102,7 +101,6 @@ def _handle_device_requests( if cuda_available else "No Cuda installation/ GPU was found and" ) - # TODO add reference to table of cpu capable algos as help! raise AlgorithmNotCPUCompatibleException( f"{cause} the chosen algorithm is not CPU-compatible. Aborting..." ) @@ -125,12 +123,11 @@ def _get_additional_files_path(algorithm: AlgorithmData) -> Path: Path to the additional files """ # ensure weights are present and get path - # TODO refactor this rename weights to additional files if algorithm.weights is not None: - return check_model_weights(record_id=algorithm.weights.record_id) + return check_additional_files_path(record_id=algorithm.weights.record_id) else: # if no weights are directly specified a dummy weights folder will be mounted - return get_dummy_weights_path() + return get_dummy_path() def _get_volume_mappings( @@ -195,7 +192,7 @@ def _build_args( command_args, extra_args (Tuple): The command arguments and extra arguments """ # Build command that will be run in the docker container - command_args = f"--data_path=/mlcube_io0 --output_path=/mlcube_io2" + command_args = f"--data_path=/mlcube_io0 --output_path=/mlcube_io2" if algorithm.weights is not None: weights_arg = f"--{algorithm.weights.param_name}=/mlcube_io1" if algorithm.weights.checkpoint_path: @@ -285,7 +282,7 @@ def _log_algorithm_info(algorithm: AlgorithmData): logger.debug(f"Docker image: {algorithm.run_args.docker_image}") -def run_docker( +def run_container( algorithm: AlgorithmData, data_path: Path, output_path: Path, diff --git a/brats/data/meta/africa.yml b/brats/data/meta/africa.yml index 626ec6a..f9a6f92 100644 --- a/brats/data/meta/africa.yml +++ b/brats/data/meta/africa.yml @@ -3,7 +3,7 @@ algorithms: BraTS23_1: meta: authors: Andriy Myronenko, et al. - paper: TODO + paper: N/A challenge: BraTS23 BraTS-Africa Segmentation rank: 1st year: 2023 diff --git a/brats/utils/data_handling.py b/brats/utils/data_handling.py index 6106f71..1eab8ff 100644 --- a/brats/utils/data_handling.py +++ b/brats/utils/data_handling.py @@ -1,11 +1,10 @@ from __future__ import annotations -from contextlib import contextmanager import shutil -import sys -from pathlib import Path import tempfile -from typing import Dict, Generator, List, Optional, Tuple +from contextlib import contextmanager +from pathlib import Path +from typing import Generator, Optional, Tuple import nibabel as nib from loguru import logger @@ -90,6 +89,7 @@ def input_sanity_check( t2w (Path | str, optional): T2w image path (required for segmentation) mask (Path | str, optional): Mask image path (required for inpainting) """ + # Filter out None values to only include provided images images = { "t1n": t1n, diff --git a/brats/utils/zenodo.py b/brats/utils/zenodo.py index 8a1c12a..f99c817 100644 --- a/brats/utils/zenodo.py +++ b/brats/utils/zenodo.py @@ -14,17 +14,20 @@ from brats.utils.constants import ADDITIONAL_FILES_FOLDER, ZENODO_RECORD_BASE_URL -def get_dummy_weights_path() -> Path: +def get_dummy_path() -> Path: dummy = ADDITIONAL_FILES_FOLDER / "dummy" dummy.mkdir(exist_ok=True, parents=True) return dummy -def check_model_weights(record_id: str) -> Path: - """Check if latest model weights are present and download them otherwise. +def check_additional_files_path(record_id: str) -> Path: + """Check if latest additional files are present and download them otherwise. + + Args: + record_id (str): Zenodo record ID. Returns: - Path: Path to the model weights folder. + Path: Path to the additional files folder. """ zenodo_metadata, archive_url = _get_zenodo_metadata_and_archive_url( @@ -44,7 +47,7 @@ def check_model_weights(record_id: str) -> Path: sys.exit() logger.info(f"Model weights not found locally") - return _download_model_weights( + return _download_additional_files( zenodo_metadata=zenodo_metadata, record_id=record_id, archive_url=archive_url, @@ -75,7 +78,7 @@ def check_model_weights(record_id: str) -> Path: f"Failed to delete {path}: {excinfo}" ), ) - return _download_model_weights( + return _download_additional_files( zenodo_metadata=zenodo_metadata, record_id=record_id, archive_url=archive_url ) @@ -123,24 +126,24 @@ def _get_zenodo_metadata_and_archive_url(record_id: str) -> Dict | None: return None -def _download_model_weights( +def _download_additional_files( zenodo_metadata: Dict, record_id: str, archive_url: str ) -> Path: - """Download the latest model weights from Zenodo for the requested record and extract them to the target folder. + """Download the latest additional files from Zenodo for the requested record and extract them to the target folder. Args: - ADDITIONAL_FILES_FOLDER (Path): General weights folder path in which the requested model weights will be stored. zenodo_metadata (Dict): Metadata for the Zenodo record. record_id (str): Zenodo record ID. + archive_url (str): URL to the archive file. Returns: - Path: Path to the model weights folder for the requested record. + Path: Path to the additional files folder for the requested record. """ - record_ADDITIONAL_FILES_FOLDER = ( + record_folder = ( ADDITIONAL_FILES_FOLDER / f"{record_id}_v{zenodo_metadata['version']}" ) # ensure folder exists - record_ADDITIONAL_FILES_FOLDER.mkdir(parents=True, exist_ok=True) + record_folder.mkdir(parents=True, exist_ok=True) logger.info(f"Downloading model weights from Zenodo. This might take a while...") # Make a GET request to the URL @@ -152,6 +155,13 @@ def _download_model_weights( ) return + _extract_archive(response=response, record_folder=record_folder) + + logger.info(f"Zip file extracted successfully to {record_folder}") + return record_folder + + +def _extract_archive(response: requests.Response, record_folder: Path): # Download with progress bar chunk_size = 1024 # 1KB bytes_io = BytesIO() @@ -159,7 +169,7 @@ def _download_model_weights( with Progress( SpinnerColumn(), TextColumn("[cyan]Downloading weights..."), - TextColumn("{task.completed:.2f} MB"), + TextColumn("[cyan]{task.completed:.2f} MB"), transient=True, ) as progress: task = progress.add_task("", total=None) # Indeterminate progress @@ -172,10 +182,10 @@ def _download_model_weights( # Extract the downloaded zip file to the target folder with zipfile.ZipFile(bytes_io) as zip_ref: - zip_ref.extractall(record_ADDITIONAL_FILES_FOLDER) + zip_ref.extractall(record_folder) # check if the extracted file is still a zip - for f in record_ADDITIONAL_FILES_FOLDER.iterdir(): + for f in record_folder.iterdir(): if f.is_file() and f.suffix == ".zip": with zipfile.ZipFile(f) as zip_ref: files = zip_ref.namelist() @@ -185,10 +195,7 @@ def _download_model_weights( ) # Iterate over the files and extract them for i, file in enumerate(files): - zip_ref.extract(file, record_ADDITIONAL_FILES_FOLDER) + zip_ref.extract(file, record_folder) # Update the progress bar progress.update(task, completed=i + 1) f.unlink() # remove zip after extraction - - logger.info(f"Zip file extracted successfully to {record_ADDITIONAL_FILES_FOLDER}") - return record_ADDITIONAL_FILES_FOLDER diff --git a/pyproject.toml b/pyproject.toml index 3e41ac7..7d78d24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ nibabel = ">=5.0.0" [tool.poetry.dev-dependencies] pytest = ">=8.0.0" +pytest-cov = ">=5.0.0" [tool.poetry.group.docs] optional = true diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/test_brats_algorithm.py b/tests/core/test_brats_algorithm.py new file mode 100644 index 0000000..be6b4b0 --- /dev/null +++ b/tests/core/test_brats_algorithm.py @@ -0,0 +1,109 @@ +import unittest +from unittest.mock import MagicMock, patch +from pathlib import Path +import tempfile +import shutil + +from brats import AdultGliomaSegmenter +from brats.utils.constants import OUTPUT_NAME_SCHEMA + + +class TestBraTSAlgorithm(unittest.TestCase): + + def setUp(self): + # Create a temporary directory for testing + self.test_dir = Path(tempfile.mkdtemp()) + self.data_folder = self.test_dir / "data" + self.data_folder.mkdir(parents=True, exist_ok=True) + self.output_folder = self.test_dir / "output" + self.output_folder.mkdir(parents=True, exist_ok=True) + + self.subject_A_folder = self.data_folder / "A" + self.subject_A_folder.mkdir(parents=True, exist_ok=True) + # Create mock file paths + self.input_files = { + "t1c": self.subject_A_folder / "A-t1c.nii.gz", + "t1n": self.subject_A_folder / "A-t1n.nii.gz", + "t2f": self.subject_A_folder / "A-t2f.nii.gz", + "t2w": self.subject_A_folder / "A-t2w.nii.gz", + } + for file in self.input_files.values(): + file.touch() + + # the core inference method is the same for all segmentation and inpainting algorithms, we use AdultGliomaSegmenter as an example during testing + self.segmenter = AdultGliomaSegmenter() + + def tearDown(self): + # Remove the temporary directory after the test + shutil.rmtree(self.test_dir) + + @patch("brats.core.brats_algorithm.run_container") + @patch("brats.core.segmentation_algorithms.input_sanity_check") + @patch("brats.core.brats_algorithm.InferenceSetup") + def test_infer_single( + self, mock_inference_setup, mock_input_sanity_check, mock_run_container + ): + + # Mock InferenceSetup context manager + mock_inference_setup_ret = mock_inference_setup.return_value + mock_inference_setup_ret.__enter__.return_value = ( + self.data_folder, + self.output_folder, + ) + + def create_output_file(*args, **kwargs): + subject_id = self.segmenter.algorithm.run_args.input_name_schema.format( + id=0 + ) + alg_output_file = self.output_folder / OUTPUT_NAME_SCHEMA[ + self.segmenter.task + ].format(subject_id=subject_id) + alg_output_file.touch() + + mock_run_container.side_effect = create_output_file + + output_file = self.output_folder / "output.nii.gz" + self.segmenter.infer_single( + t1c=self.input_files["t1c"], + t1n=self.input_files["t1n"], + t2f=self.input_files["t2f"], + t2w=self.input_files["t2w"], + output_file=output_file, + ) + mock_input_sanity_check.assert_called_once() + mock_run_container.assert_called_once() + + self.assertTrue(output_file.exists()) + + @patch("brats.core.brats_algorithm.run_container") + @patch("brats.core.segmentation_algorithms.input_sanity_check") + @patch("brats.core.brats_algorithm.InferenceSetup") + def test_infer_batch( + self, mock_inference_setup, mock_input_sanity_check, mock_run_container + ): + + # Mock InferenceSetup context manager + mock_inference_setup_ret = mock_inference_setup.return_value + mock_inference_setup_ret.__enter__.return_value = ( + self.data_folder, + self.output_folder, + ) + + def create_output_file(*args, **kwargs): + subject_id = self.segmenter.algorithm.run_args.input_name_schema.format( + id=0 + ) + alg_output_file = self.output_folder / OUTPUT_NAME_SCHEMA[ + self.segmenter.task + ].format(subject_id=subject_id) + alg_output_file.touch() + + mock_run_container.side_effect = create_output_file + + self.segmenter.infer_batch( + data_folder=self.data_folder, output_folder=self.output_folder + ) + mock_input_sanity_check.assert_called_once() + mock_run_container.assert_called_once() + output_file = self.output_folder / "A.nii.gz" + self.assertTrue(output_file.exists()) diff --git a/tests/core/test_docker.py b/tests/core/test_docker.py new file mode 100644 index 0000000..1381341 --- /dev/null +++ b/tests/core/test_docker.py @@ -0,0 +1,322 @@ +import unittest +from unittest.mock import MagicMock, patch, call +from pathlib import Path +import shutil +import subprocess +import tempfile + +from brats.core.docker import ( + _log_algorithm_info, + _show_docker_pull_progress, + _ensure_image, + _is_cuda_available, + _handle_device_requests, + _get_additional_files_path, + _get_volume_mappings, + _get_parameters_arg, + _build_args, + _observe_docker_output, + _sanity_check_output, + run_container, +) +from rich.progress import Progress +from brats.utils.algorithm_config import AlgorithmData +from brats.utils.constants import PARAMETERS_DIR +from brats.utils.exceptions import ( + AlgorithmNotCPUCompatibleException, + BraTSContainerException, +) + + +class TestDockerHelpers(unittest.TestCase): + + def setUp(self): + # Create a temporary directory for testing + self.test_dir = Path(tempfile.mkdtemp()) + self.data_folder = self.test_dir / "data" + self.data_folder.mkdir(parents=True, exist_ok=True) + self.output_folder = self.test_dir / "output" + self.output_folder.mkdir(parents=True, exist_ok=True) + + # Create mock algorithm data + self.algorithm_gpu = AlgorithmData( + run_args=MagicMock( + docker_image="brainles/test-image-1:latest", + parameters_file=True, + shm_size="1g", + cpu_compatible=False, + ), + weights=MagicMock(param_name="weights", checkpoint_path="checkpoint.pth"), + meta=MagicMock( + challenge="Challenge", + rank="1st", + paper="paper_url", + authors="author_names", + ), + ) + + self.algorithm_cpu = AlgorithmData( + run_args=MagicMock( + docker_image="brainles/test-image-2:latest", + parameters_file=True, + shm_size="1g", + cpu_compatible=True, + ), + weights=MagicMock(param_name="weights", checkpoint_path="checkpoint.pth"), + meta=MagicMock( + challenge="Challenge", + rank="1st", + paper="paper_url", + authors="author_names", + ), + ) + + def tearDown(self): + # Remove the temporary directory after the test + shutil.rmtree(self.test_dir) + + @patch("brats.core.docker.client") + def test_show_docker_pull_progress(self, MockClient): + tasks = {} + with Progress() as progress: + line = { + "status": "Downloading", + "id": "id1", + "progressDetail": {"total": 100, "current": 50}, + } + _show_docker_pull_progress(tasks, progress, line) + self.assertIn("[Download id1]", tasks) + + line = { + "status": "Extracting", + "id": "id2", + "progressDetail": {"total": 100, "current": 50}, + } + _show_docker_pull_progress(tasks, progress, line) + self.assertIn("[Extract id2]", tasks) + + @patch("brats.core.docker.client.images.list", return_value=[]) + @patch("brats.core.docker.client.api.pull") + def test_ensure_image(self, MockPull, MockList): + MockPull.return_value = iter( + [ + { + "status": "Downloading", + "id": "test_image", + "progressDetail": {"total": 100, "current": 50}, + } + ] + ) + _ensure_image("test-image:latest") + MockPull.assert_called_once_with("test-image:latest", stream=True, decode=True) + + @patch("subprocess.run") + def test_is_cuda_available(self, MockRun): + MockRun.return_value = None + self.assertTrue(_is_cuda_available()) + MockRun.assert_called_once_with( + ["nvidia-smi"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) + + @patch("subprocess.run") + def test_is_cuda_available(self, MockRun): + MockRun.side_effect = Exception() + self.assertFalse(_is_cuda_available()) + MockRun.assert_called_once_with( + ["nvidia-smi"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) + + @patch("brats.core.docker._is_cuda_available", return_value=True) + def test_handle_device_requests_cuda(self, MockIsCudaAvailable): + result = _handle_device_requests( + algorithm=self.algorithm_gpu, cuda_devices="42", force_cpu=False + ) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].device_ids, ["42"]) + self.assertEqual(result[0].capabilities, [["gpu"]]) + + @patch("brats.core.docker._is_cuda_available", return_value=False) + def test_handle_device_requests_force_cpu_valid(self, MockIsCudaAvailable): + device_requests = _handle_device_requests( + algorithm=self.algorithm_cpu, cuda_devices="0", force_cpu=True + ) + self.assertEqual(len(device_requests), 0) + + @patch("brats.core.docker._is_cuda_available", return_value=False) + def test_handle_device_requests_no_cuda_no_cpu(self, MockIsCudaAvailable): + with self.assertRaises(AlgorithmNotCPUCompatibleException): + _handle_device_requests( + algorithm=self.algorithm_gpu, cuda_devices="0", force_cpu=False + ) + + @patch("brats.core.docker._is_cuda_available", return_value=False) + def test_handle_device_requests_force_cpu_invalid(self, MockIsCudaAvailable): + with self.assertRaises(AlgorithmNotCPUCompatibleException): + _handle_device_requests( + algorithm=self.algorithm_gpu, cuda_devices="0", force_cpu=True + ) + + @patch("brats.core.docker.check_additional_files_path") + def test_get_additional_files_path(self, MockCheckAdditionalFilesPath): + MockCheckAdditionalFilesPath.return_value = self.test_dir + result = _get_additional_files_path(self.algorithm_gpu) + self.assertEqual(result, self.test_dir) + + def test_get_volume_mappings(self): + result = _get_volume_mappings( + data_path=self.data_folder, + additional_files_path=self.test_dir, + output_path=self.output_folder, + parameters_path=PARAMETERS_DIR, + ) + expected = { + self.data_folder.absolute(): {"bind": "/mlcube_io0", "mode": "rw"}, + self.test_dir.absolute(): {"bind": "/mlcube_io1", "mode": "rw"}, + self.output_folder.absolute(): {"bind": "/mlcube_io2", "mode": "rw"}, + PARAMETERS_DIR.absolute(): {"bind": "/mlcube_io3", "mode": "rw"}, + } + self.assertEqual(result, expected) + + def test_get_parameters_arg_dummy(self): + result = _get_parameters_arg(self.algorithm_gpu) + expected = f" --parameters_file=/mlcube_io3/dummy.yml" + self.assertEqual(result, expected) + + def test_get_parameters_arg_file(self): + with patch("brats.core.docker.PARAMETERS_DIR", self.test_dir): + identifier = self.algorithm_gpu.run_args.docker_image.split(":")[0].split( + "/" + )[-1] + file = self.test_dir / f"{identifier}.yml" + file.touch() + result = _get_parameters_arg(self.algorithm_gpu) + expected = f" --parameters_file=/mlcube_io3/{file.name}" + self.assertEqual(result, expected) + + def test_build_args(self): + result = _build_args(self.algorithm_gpu) + expected_command_args = [ + "--data_path=/mlcube_io0", + "--output_path=/mlcube_io2", + "--weights=/mlcube_io1/checkpoint.pth", + "--parameters_file=/mlcube_io3/dummy.yml", + ] + for arg in expected_command_args: + self.assertIn(arg, result[0]) + self.assertEqual(result[1], {}) + + @patch("brats.core.docker.Console") + @patch("brats.core.docker.docker.models.containers.Container") + def test_observe_docker_output(self, MockContainer, MockConsole): + mock_container = MagicMock() + mock_container.attach.return_value = [b"output log line"] + mock_container.wait.return_value = {"StatusCode": 0} + result = _observe_docker_output(mock_container) + self.assertEqual(result, "output log line") + + def test_sanity_check_output(self): + # Create mock paths + mock_data_path = MagicMock(spec=Path) + mock_output_path = MagicMock(spec=Path) + + # Simulate input files starting with "BraTS" and output files + mock_data_path.iterdir.return_value = [ + MagicMock(name="file1", spec=Path), + MagicMock(name="file2", spec=Path), + ] + mock_output_path.iterdir.return_value = [ + MagicMock(name="file1", spec=Path), + MagicMock(name="file2", spec=Path), + ] + + # Define container_output + container_output = "Sample container output" + + # Check that no exception is raised + try: + _sanity_check_output( + data_path=mock_data_path, + output_path=mock_output_path, + container_output=container_output, + ) + except BraTSContainerException: + self.fail("BraTSContainerException was raised unexpectedly") + + def test_sanity_check_output_fail(self): + # Create mock paths + mock_data_path = MagicMock(spec=Path) + mock_output_path = MagicMock(spec=Path) + + # Simulate input files starting with "BraTS" and output files + mock_data_path.iterdir.return_value = [ + MagicMock(name="file1", spec=Path), + MagicMock(name="file2", spec=Path), + ] + mock_output_path.iterdir.return_value = [ + MagicMock(name="file1", spec=Path), + ] + + # Define container_output + container_output = "Sample container output" + + # Check that the exception is raised + with self.assertRaises(BraTSContainerException): + _sanity_check_output( + data_path=mock_data_path, + output_path=mock_output_path, + container_output=container_output, + ) + + @patch("brats.core.docker.logger.debug") + def test_log_algorithm_info(self, MockLoggerDebug): + _log_algorithm_info(algorithm=self.algorithm_gpu) + + MockLoggerDebug.assert_called_once() + + @patch("brats.core.docker._log_algorithm_info") + @patch("brats.core.docker._ensure_image") + @patch("brats.core.docker._get_additional_files_path") + @patch("brats.core.docker._get_volume_mappings") + @patch("brats.core.docker._build_args") + @patch("brats.core.docker._handle_device_requests") + @patch("brats.core.docker._observe_docker_output") + @patch("brats.core.docker.client") + def test_run_container( + self, + mock_client, + mock_observe_docker_output, + mock_handle_device_requests, + mock_build_args, + mock_get_volume_mappings, + mock_get_additional_files_path, + mock_ensure_image, + mock_log_algorithm_info, + ): + + # setup mocks + mock_build_args.return_value = ("args", {}) + + # run + cuda_devices = "0" + force_cpu = False + run_container( + algorithm=self.algorithm_gpu, + data_path=self.data_folder, + output_path=self.output_folder, + cuda_devices=cuda_devices, + force_cpu=force_cpu, + ) + + # Verify mocks were called as expected + mock_log_algorithm_info.assert_called_once_with(algorithm=self.algorithm_gpu) + mock_ensure_image.assert_called_once() + mock_get_additional_files_path.assert_called_once() + mock_get_volume_mappings.assert_called_once() + mock_build_args.assert_called_once() + mock_handle_device_requests.assert_called_once() diff --git a/tests/core/test_inpainting_algorithms.py b/tests/core/test_inpainting_algorithms.py new file mode 100644 index 0000000..1e15dae --- /dev/null +++ b/tests/core/test_inpainting_algorithms.py @@ -0,0 +1,102 @@ +import shutil +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +from loguru import logger + +from brats import Inpainter +from brats.utils.constants import InpaintingAlgorithms + + +class TestInpaintingAlgorithms(unittest.TestCase): + def setUp(self): + # Create a temporary directory + self.test_dir = Path(tempfile.mkdtemp()) + self.data_folder = self.test_dir / "data" + self.data_folder.mkdir(parents=True, exist_ok=True) + self.tmp_data_folder = self.test_dir / "tmp_std_data" + self.tmp_data_folder.mkdir(parents=True, exist_ok=True) + + # Create mock paths for input images + self.subject_folder = self.data_folder / "subject" + self.subject_folder.mkdir(parents=True, exist_ok=True) + self.t1n = self.data_folder / "subject-t1n-voided.nii.gz" + self.mask = self.data_folder / "subject-mask.nii.gz" + # Create dummy files + for img in [self.t1n, self.mask]: + img.touch(exist_ok=True) + + self.segmenter = Inpainter() + + def tearDown(self): + # Remove the temporary directory after the test + shutil.rmtree(self.test_dir) + + ### Standardization tests + + @patch("brats.core.inpainting_algorithms.input_sanity_check") + def test_successful_single_standardization(self, mock_input_sanity_check): + subject_id = "test_subject" + self.segmenter._standardize_single_inputs( + data_folder=self.tmp_data_folder, + subject_id=subject_id, + inputs={ + "t1n": self.t1n, + "mask": self.mask, + }, + ) + subject_folder = self.tmp_data_folder / subject_id + self.assertTrue(subject_folder.exists()) + self.assertTrue((subject_folder / f"{subject_id}-t1n-voided.nii.gz").exists()) + self.assertTrue((subject_folder / f"{subject_id}-mask.nii.gz").exists()) + + @patch("brats.core.inpainting_algorithms.input_sanity_check") + @patch("sys.exit") + @patch.object(logger, "error") + def test_single_standardize_handle_file_not_found_error( + self, mock_logger, mock_exit, mock_input_sanity_check + ): + subject_id = "test_subject" + # Provide a non-existent file path for t1c + t1n = "non_existent_file.nii.gz" + self.segmenter._standardize_single_inputs( + data_folder=self.data_folder, + subject_id=subject_id, + inputs={ + "t1n": t1n, + "mask": self.mask, + }, + ) + mock_logger.assert_called() + mock_exit.assert_called_with(1) + + @patch("brats.core.inpainting_algorithms.Inpainter._standardize_single_inputs") + def test_standardize_segmentation_inputs_list(self, mock_standardize_single_inputs): + subjects = [f for f in self.data_folder.iterdir() if f.is_dir()] + mapping = self.segmenter._standardize_batch_inputs( + data_folder=self.tmp_data_folder, + subjects=subjects, + input_name_schema="BraTS-GLI-{id:05d}-000", + ) + self.assertDictEqual( + mapping, + { + "BraTS-GLI-00000-000": "subject", + }, + ) + mock_standardize_single_inputs.assert_called_once() + + ### Initialization tests + + def test_inpainter_initialization(self): + # Test default initialization + inpainter = Inpainter() + self.assertIsInstance(inpainter, Inpainter) + + # Test with custom arguments + custom_inpainter = Inpainter( + algorithm=InpaintingAlgorithms.BraTS23_2, cuda_devices="1", force_cpu=True + ) + self.assertIsInstance(custom_inpainter, Inpainter) diff --git a/tests/core/test_segmentation_algorithms.py b/tests/core/test_segmentation_algorithms.py new file mode 100644 index 0000000..0c2d85d --- /dev/null +++ b/tests/core/test_segmentation_algorithms.py @@ -0,0 +1,168 @@ +import shutil +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +from loguru import logger + +from brats import ( + AdultGliomaSegmenter, + AfricaSegmenter, + MeningiomaSegmenter, + MetastasesSegmenter, + PediatricSegmenter, +) +from brats.utils.constants import ( + AdultGliomaAlgorithms, + AfricaAlgorithms, + MeningiomaAlgorithms, + MetastasesAlgorithms, + PediatricAlgorithms, +) + + +class TestSegmentationAlgorithms(unittest.TestCase): + def setUp(self): + # Create a temporary directory + self.test_dir = Path(tempfile.mkdtemp()) + self.data_folder = self.test_dir / "data" + self.data_folder.mkdir(parents=True, exist_ok=True) + self.tmp_data_folder = self.test_dir / "tmp_std_data" + self.tmp_data_folder.mkdir(parents=True, exist_ok=True) + + # Create mock paths for input images + self.subject_folder = self.data_folder / "subject" + self.subject_folder.mkdir(parents=True, exist_ok=True) + self.t1c = self.data_folder / "subject-t1c.nii.gz" + self.t1n = self.data_folder / "subject-t1n.nii.gz" + self.t2f = self.data_folder / "subject-t2f.nii.gz" + self.t2w = self.data_folder / "subject-t2w.nii.gz" + # Create dummy files + for img in [self.t1c, self.t1n, self.t2f, self.t2w]: + img.touch(exist_ok=True) + + self.segmenter = AdultGliomaSegmenter() + + def tearDown(self): + # Remove the temporary directory after the test + shutil.rmtree(self.test_dir) + + ### Standardization tests + + @patch("brats.core.segmentation_algorithms.input_sanity_check") + def test_successful_single_standardization(self, mock_input_sanity_check): + subject_id = "test_subject" + self.segmenter._standardize_single_inputs( + data_folder=self.tmp_data_folder, + subject_id=subject_id, + inputs={ + "t1c": self.t1c, + "t1n": self.t1n, + "t2f": self.t2f, + "t2w": self.t2w, + }, + ) + subject_folder = self.tmp_data_folder / subject_id + self.assertTrue(subject_folder.exists()) + for img_type in ["t1c", "t1n", "t2f", "t2w"]: + self.assertTrue( + (subject_folder / f"{subject_id}-{img_type}.nii.gz").exists() + ) + + @patch("brats.core.segmentation_algorithms.input_sanity_check") + @patch("sys.exit") + @patch.object(logger, "error") + def test_single_standardize_handle_file_not_found_error( + self, mock_logger, mock_exit, mock_input_sanity_check + ): + subject_id = "test_subject" + # Provide a non-existent file path for t1c + t1c = "non_existent_file.nii.gz" + self.segmenter._standardize_single_inputs( + data_folder=self.data_folder, + subject_id=subject_id, + inputs={ + "t1c": t1c, + "t1n": self.t1n, + "t2f": self.t2f, + "t2w": self.t2w, + }, + ) + mock_logger.assert_called() + mock_exit.assert_called_with(1) + + @patch( + "brats.core.segmentation_algorithms.SegmentationAlgorithm._standardize_single_inputs" + ) + def test_standardize_segmentation_inputs_list(self, mock_standardize_single_inputs): + subjects = [f for f in self.data_folder.iterdir() if f.is_dir()] + mapping = self.segmenter._standardize_batch_inputs( + data_folder=self.tmp_data_folder, + subjects=subjects, + input_name_schema="BraTS-GLI-{id:05d}-000", + ) + self.assertDictEqual( + mapping, + { + "BraTS-GLI-00000-000": "subject", + }, + ) + mock_standardize_single_inputs.assert_called_once() + + ### Initialization tests + + def test_adult_glioma_segmenter_initialization(self): + # Test default initialization + segmenter = AdultGliomaSegmenter() + self.assertIsInstance(segmenter, AdultGliomaSegmenter) + + # Test with custom arguments + custom_segmenter = AdultGliomaSegmenter( + algorithm=AdultGliomaAlgorithms.BraTS23_2, cuda_devices="1", force_cpu=True + ) + self.assertIsInstance(custom_segmenter, AdultGliomaSegmenter) + + def test_meningioma_segmenter_initialization(self): + # Test default initialization + segmenter = MeningiomaSegmenter() + self.assertIsInstance(segmenter, MeningiomaSegmenter) + + # Test with custom arguments + custom_segmenter = MeningiomaSegmenter( + algorithm=MeningiomaAlgorithms.BraTS23_2, cuda_devices="1", force_cpu=True + ) + self.assertIsInstance(custom_segmenter, MeningiomaSegmenter) + + def test_pediatric_segmenter_initialization(self): + # Test default initialization + segmenter = PediatricSegmenter() + self.assertIsInstance(segmenter, PediatricSegmenter) + + # Test with custom arguments + custom_segmenter = PediatricSegmenter( + algorithm=PediatricAlgorithms.BraTS23_2, cuda_devices="1", force_cpu=True + ) + self.assertIsInstance(custom_segmenter, PediatricSegmenter) + + def test_africa_segmenter_initialization(self): + # Test default initialization + segmenter = AfricaSegmenter() + self.assertIsInstance(segmenter, AfricaSegmenter) + + # Test with custom arguments + custom_segmenter = AfricaSegmenter( + algorithm=AfricaAlgorithms.BraTS23_2, cuda_devices="1", force_cpu=True + ) + self.assertIsInstance(custom_segmenter, AfricaSegmenter) + + def test_metastases_segmenter_initialization(self): + # Test default initialization + segmenter = MetastasesSegmenter() + self.assertIsInstance(segmenter, MetastasesSegmenter) + + # Test with custom arguments + custom_segmenter = MetastasesSegmenter( + algorithm=MetastasesAlgorithms.BraTS23_2, cuda_devices="1", force_cpu=True + ) + self.assertIsInstance(custom_segmenter, MetastasesSegmenter) diff --git a/tests/test_utils_data_handling.py b/tests/test_utils_data_handling.py deleted file mode 100644 index e89c97e..0000000 --- a/tests/test_utils_data_handling.py +++ /dev/null @@ -1,127 +0,0 @@ -import shutil -import tempfile -import unittest -from pathlib import Path -from unittest.mock import MagicMock, patch - -from loguru import logger - -from brats.utils.data_handling import input_sanity_check - - -class TestDataHandlingUtils(unittest.TestCase): - def setUp(self): - # Create a temporary directory - self.test_dir = Path(tempfile.mkdtemp()) - self.data_folder = self.test_dir / "data" - self.data_folder.mkdir(parents=True, exist_ok=True) - self.tmp_data_folder = self.test_dir / "tmp_std_data" - self.tmp_data_folder.mkdir(parents=True, exist_ok=True) - - # Create mock paths for input images - self.subject_folder = self.data_folder / "subject" - self.subject_folder.mkdir(parents=True, exist_ok=True) - self.t1c = self.data_folder / "subject-t1c.nii.gz" - self.t1n = self.data_folder / "subject-t1n.nii.gz" - self.t2f = self.data_folder / "subject-t2f.nii.gz" - self.t2w = self.data_folder / "subject-t2w.nii.gz" - # Create dummy files - for img in [self.t1c, self.t1n, self.t2f, self.t2w]: - img.touch(exist_ok=True) - - def tearDown(self): - # Remove the temporary directory after the test - shutil.rmtree(self.test_dir) - - # @patch("brats.utils.data_handling.input_sanity_check") - # def test_successful_standardization(self, mock_input_sanity_check): - # subject_id = "test_subject" - # standardize_segmentation_inputs( - # data_folder=self.tmp_data_folder, - # subject_id=subject_id, - # t1c=self.t1c, - # t1n=self.t1n, - # t2f=self.t2f, - # t2w=self.t2w, - # ) - # subject_folder = self.tmp_data_folder / subject_id - # self.assertTrue(subject_folder.exists()) - # for img_type in ["t1c", "t1n", "t2f", "t2w"]: - # self.assertTrue( - # (subject_folder / f"{subject_id}-{img_type}.nii.gz").exists() - # ) - - # @patch("brats.utils.data_handling.input_sanity_check") - # @patch("sys.exit") - # @patch.object(logger, "error") - # def test_handle_file_not_found_error( - # self, mock_logger, mock_exit, mock_input_sanity_check - # ): - # subject_id = "test_subject" - # # Provide a non-existent file path for t1c - # t1c = "non_existent_file.nii.gz" - # standardize_segmentation_inputs( - # data_folder=self.data_folder, - # subject_id=subject_id, - # t1c=t1c, - # t1n=self.t1n, - # t2f=self.t2f, - # t2w=self.t2w, - # ) - # mock_logger.assert_called() - # mock_exit.assert_called_with(1) - - # @patch("brats.utils.data_handling.standardize_segmentation_inputs") - # def test_standardize_segmentation_inputs_list( - # self, mock_standardize_segmentation_inputs - # ): - # subjects = [f for f in self.data_folder.iterdir() if f.is_dir()] - # mapping = standardize_segmentation_inputs_list( - # subjects=subjects, - # tmp_data_folder=self.tmp_data_folder, - # input_name_schema="BraTS-PED-{id:05d}-000", - # ) - # self.assertDictEqual( - # mapping, - # { - # "BraTS-PED-00000-000": "subject", - # }, - # ) - # mock_standardize_segmentation_inputs.assert_called_once() - - @patch("brats.utils.data_handling.nib.load") - @patch("brats.utils.data_handling.logger.warning") - def test_correct_shape(self, mock_warning, mock_nib_load): - # Mock nib.load to return an object with shape (240, 240, 155) - mock_img = MagicMock() - mock_img.shape = (240, 240, 155) - mock_nib_load.return_value = mock_img - - # Call the function with correct shapes - input_sanity_check("t1c.nii.gz", "t1n.nii.gz", "t2f.nii.gz", "t2w.nii.gz") - - # Ensure no warnings are logged - mock_warning.assert_not_called() - - @patch("brats.utils.data_handling.nib.load") - @patch("brats.utils.data_handling.logger.warning") - def test_incorrect_shape(self, mock_warning, mock_nib_load): - # Mock nib.load to return an object with shape (191, 512, 512) for one image - mock_img_correct = MagicMock() - mock_img_correct.shape = (240, 240, 155) - mock_img_incorrect = MagicMock() - mock_img_incorrect.shape = (191, 512, 512) - - def side_effect(arg): - if arg == "t1c.nii.gz": - return mock_img_incorrect - else: - return mock_img_correct - - mock_nib_load.side_effect = side_effect - - # Call the function with one incorrect shape - input_sanity_check("t1c.nii.gz", "t1n.nii.gz", "t2f.nii.gz", "t2w.nii.gz") - - # Ensure warnings are logged - self.assertTrue(mock_warning.called) diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_algorithm_config.py b/tests/utils/test_algorithm_config.py similarity index 96% rename from tests/test_algorithm_config.py rename to tests/utils/test_algorithm_config.py index 1f13d0b..2a116ba 100644 --- a/tests/test_algorithm_config.py +++ b/tests/utils/test_algorithm_config.py @@ -14,7 +14,6 @@ def test_configs_valid(self): for f in META_DIR.iterdir() if f.is_file() and f.suffix in [".yml", ".yaml"] ] - print(configs) for config in configs: try: load_algorithms(file_path=config) diff --git a/tests/utils/test_data_handling.py b/tests/utils/test_data_handling.py new file mode 100644 index 0000000..47e8c08 --- /dev/null +++ b/tests/utils/test_data_handling.py @@ -0,0 +1,152 @@ +import shutil +import tempfile +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +from loguru import logger + +from brats.utils.data_handling import ( + InferenceSetup, + add_log_file_handler, + input_sanity_check, + remove_tmp_folder, +) + + +class TestDataHandlingUtils(unittest.TestCase): + def setUp(self): + # Create a temporary directory + self.test_dir = Path(tempfile.mkdtemp()) + self.data_folder = self.test_dir / "data" + self.data_folder.mkdir(parents=True, exist_ok=True) + self.tmp_data_folder = self.test_dir / "tmp_std_data" + self.tmp_data_folder.mkdir(parents=True, exist_ok=True) + + # Create mock paths for input images + self.subject_folder = self.data_folder / "subject" + self.subject_folder.mkdir(parents=True, exist_ok=True) + self.t1c = self.data_folder / "subject-t1c.nii.gz" + self.t1n = self.data_folder / "subject-t1n.nii.gz" + self.t2f = self.data_folder / "subject-t2f.nii.gz" + self.t2w = self.data_folder / "subject-t2w.nii.gz" + # Create dummy files + for img in [self.t1c, self.t1n, self.t2f, self.t2w]: + img.touch(exist_ok=True) + + def tearDown(self): + # Remove the temporary directory after the test + shutil.rmtree(self.test_dir) + + def test_inference_setup_with_log_file(self): + # Create a temporary log file + tmp_log_file = Path(tempfile.mktemp()) + + with InferenceSetup(log_file=tmp_log_file) as ( + tmp_data_folder, + tmp_output_folder, + ): + # Check that the folders are created + self.assertTrue(tmp_data_folder.is_dir()) + self.assertTrue(tmp_output_folder.is_dir()) + + # Check that the log file exists + self.assertTrue(tmp_log_file.exists()) + + # Check if folders are cleaned up + self.assertFalse(tmp_data_folder.exists()) + self.assertFalse(tmp_output_folder.exists()) + + # Remove the temporary log file + tmp_log_file.unlink(missing_ok=True) + + def test_inference_setup_without_log_file(self): + # Create a temporary log file + tmp_log_file = Path(tempfile.mktemp()) + + with InferenceSetup() as (tmp_data_folder, tmp_output_folder): + # Check that the folders are created + self.assertTrue(tmp_data_folder.is_dir()) + self.assertTrue(tmp_output_folder.is_dir()) + + # Check that the log file exists + self.assertFalse(tmp_log_file.exists()) # Log file should not be created + + # Check if folders are cleaned up + self.assertFalse(tmp_data_folder.exists()) + self.assertFalse(tmp_output_folder.exists()) + + def test_remove_tmp_folder_success(self): + # Test successful removal of a folder + temp_folder = Path(tempfile.mkdtemp()) + remove_tmp_folder(temp_folder) + self.assertFalse(temp_folder.exists()) + + def test_remove_tmp_folder_permission_error(self): + # Test handling of PermissionError + # Create a folder and then set it to read-only to simulate a permission error + temp_folder = Path(tempfile.mkdtemp()) + temp_folder.chmod(0o444) # Read-only permissions + try: + remove_tmp_folder(temp_folder) + except PermissionError: + pass # We expect this exception as we are simulating it + self.assertFalse(temp_folder.exists()) # Folder should still be removed + + def test_remove_tmp_folder_file_not_found(self): + # Test handling of FileNotFoundError + fake_folder = Path(self.test_dir / "non_existent_folder") + # Ensure the folder does not exist + self.assertFalse(fake_folder.exists()) + remove_tmp_folder(fake_folder) + # No assertion needed as the function should handle the error internally + + def test_add_log_file_handler(self): + # Test adding a log file handler + log_file = Path(tempfile.mktemp()) + handler_id = add_log_file_handler(log_file) + self.assertGreater(handler_id, 0) # Ensure a positive handler ID is returned + + # Check that the log file exists and is writable + self.assertTrue(log_file.exists()) + + # Clean up + logger.remove(handler_id) + log_file.unlink(missing_ok=True) + + @patch("brats.utils.data_handling.nib.load") + @patch("brats.utils.data_handling.logger.warning") + def test_input_sanity_check_correct_shape(self, mock_warning, mock_nib_load): + # Mock nib.load to return an object with shape (240, 240, 155) + mock_img = MagicMock() + mock_img.shape = (240, 240, 155) + mock_nib_load.return_value = mock_img + + # Call the function with correct shapes + input_sanity_check("t1c.nii.gz", "t1n.nii.gz", "t2f.nii.gz", "t2w.nii.gz") + + # Ensure no warnings are logged + mock_warning.assert_not_called() + + @patch("brats.utils.data_handling.nib.load") + @patch("brats.utils.data_handling.logger.warning") + def test_input_sanity_check_incorrect_shape(self, mock_warning, mock_nib_load): + # Mock nib.load to return an object with shape (191, 512, 512) for one image + mock_img_correct = MagicMock() + mock_img_correct.shape = (240, 240, 155) + mock_img_incorrect = MagicMock() + mock_img_incorrect.shape = (191, 512, 512) + + def side_effect(arg): + if arg == "t1c.nii.gz": + return mock_img_incorrect + else: + return mock_img_correct + + mock_nib_load.side_effect = side_effect + + # Call the function with one incorrect shape + input_sanity_check("t1c.nii.gz", "t1n.nii.gz", "t2f.nii.gz", "t2w.nii.gz") + + # Ensure warnings are logged + self.assertTrue(mock_warning.called) diff --git a/tests/utils/test_zenodo.py b/tests/utils/test_zenodo.py new file mode 100644 index 0000000..438023e --- /dev/null +++ b/tests/utils/test_zenodo.py @@ -0,0 +1,153 @@ +import tempfile +import unittest +from unittest.mock import patch, MagicMock, mock_open, call +from pathlib import Path +from io import BytesIO +import requests + +# Import the module that contains the functions +from brats.utils.constants import ADDITIONAL_FILES_FOLDER +from brats.utils.zenodo import ( + _extract_archive, + check_additional_files_path, + _get_latest_version_folder_name, + _get_zenodo_metadata_and_archive_url, + _download_additional_files, +) + + +class TestZenodoUtils(unittest.TestCase): + + @patch("brats.utils.zenodo._get_zenodo_metadata_and_archive_url") + @patch("brats.utils.zenodo._get_latest_version_folder_name") + @patch("brats.utils.zenodo._download_additional_files") + @patch("brats.utils.zenodo.shutil.rmtree") + @patch("brats.utils.zenodo.Path.mkdir") + @patch("brats.utils.zenodo.Path.glob") + def test_check_additional_files_path( + self, + mock_glob, + mock_mkdir, + mock_rmtree, + mock_download_additional_files, + mock_get_latest_version, + mock_get_zenodo_metadata, + ): + # Setup + mock_record_id = "12345" + mock_matching_folder = MagicMock(spec=Path) + mock_matching_folder.name = f"{mock_record_id}_v1.0.0" + mock_glob.return_value = [mock_matching_folder] + mock_get_latest_version.return_value = f"{mock_record_id}_v1.0.0" + mock_get_zenodo_metadata.return_value = ( + {"version": "1.0.0"}, + "http://test.url", + ) + + # Test when local weights are up-to-date + result = check_additional_files_path(mock_record_id) + self.assertEqual(result, ADDITIONAL_FILES_FOLDER / f"{mock_record_id}_v1.0.0") + mock_rmtree.assert_not_called() + mock_download_additional_files.assert_not_called() + + # Test when new weights are available + mock_get_zenodo_metadata.return_value = ( + {"version": "2.0.0"}, + "http://test.url", + ) + result = check_additional_files_path(mock_record_id) + mock_rmtree.assert_called_once() + mock_download_additional_files.assert_called_once() + + @patch("brats.utils.zenodo.requests.get") + def test_get_zenodo_metadata_and_archive_url(self, mock_get): + # Setup + mock_response = MagicMock(spec=requests.Response) + mock_response.status_code = 200 + mock_response.json.return_value = { + "metadata": {"version": "1.0.0"}, + "links": {"archive": "http://test.url"}, + } + mock_get.return_value = mock_response + + metadata, archive_url = _get_zenodo_metadata_and_archive_url("12345") + self.assertEqual(metadata, {"version": "1.0.0"}) + self.assertEqual(archive_url, "http://test.url") + + # Test when the request fails + mock_get.side_effect = requests.exceptions.RequestException("Failed") + ret = _get_zenodo_metadata_and_archive_url("12345") + self.assertIsNone(ret) + + @patch("brats.utils.zenodo.ADDITIONAL_FILES_FOLDER", Path(tempfile.mkdtemp())) + @patch("brats.utils.zenodo._extract_archive") + @patch("brats.utils.zenodo.requests.get") + def test_download_additional_files( + self, + mock_requests_get, + mock_extract_archive, + ): + # Setup + mock_zenodo_metadata = {"version": "1.0.0"} + mock_archive_url = "http://test.url" + mock_response = MagicMock(spec=requests.Response) + mock_response.status_code = 200 + mock_response.iter_content = MagicMock(return_value=[b"data"]) + mock_requests_get.return_value = mock_response + + # Call the function + result_path = _download_additional_files( + mock_zenodo_metadata, "12345", mock_archive_url + ) + + # Assertions + # mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) + mock_requests_get.assert_called_once_with(mock_archive_url, stream=True) + # mock_zipfile_instance.extractall.assert_called_once_with(result_path) + mock_extract_archive.assert_called_once() + + @patch("brats.utils.zenodo.zipfile.ZipFile") + @patch("brats.utils.zenodo.BytesIO", new_callable=MagicMock) + @patch("brats.utils.zenodo.Progress") + def test_extract_archive(self, mock_progress, mock_bytes_io, mock_zipfile): + # Setup + mock_response = MagicMock(spec=requests.Response) + mock_response.iter_content.return_value = [b"data"] + mock_record_folder = MagicMock(spec=Path) + + mock_bytes_io_instance = MagicMock(spec=BytesIO) + mock_bytes_io.return_value = mock_bytes_io_instance # Mock the instantiation + + mock_zipfile_instance = MagicMock() + mock_zipfile.return_value.__enter__.return_value = mock_zipfile_instance + + # Call the function + _extract_archive(mock_response, mock_record_folder) + + def test_get_latest_version_folder_name(self): + # Test case when folders are provided + folder1 = MagicMock(spec=Path) + folder1.name = "12345_v1.0.0" + folder1.__str__.return_value = "12345_v1.0.0" + folder2 = MagicMock(spec=Path) + folder2.name = "12345_v2.0.0" + folder2.__str__.return_value = "12345_v2.0.0" + folder3 = MagicMock(spec=Path) + folder3.name = "12345_v1.5.0" + folder3.__str__.return_value = "12345_v1.5.0" + + folder2.glob.return_value = ["not empty"] + folder1.glob.return_value = ["not empty"] + folder3.glob.return_value = [] + + result = _get_latest_version_folder_name([folder1, folder2, folder3]) + self.assertEqual(result, "12345_v2.0.0") + + # Test case when no folders are provided + result = _get_latest_version_folder_name([]) + self.assertIsNone(result) + + # Test case when folder is empty + folder2.glob.return_value = [] + result = _get_latest_version_folder_name([folder1, folder2]) + self.assertIsNone(result)