diff --git a/arcana/bids/data.py b/arcana/bids/data.py index c22f56b..1dff15c 100644 --- a/arcana/bids/data.py +++ b/arcana/bids/data.py @@ -3,8 +3,8 @@ import json import re import logging -import itertools from operator import itemgetter +from copy import copy import attrs import jq from pathlib import Path @@ -91,14 +91,14 @@ def populate_tree(self, tree: DataTree): root_dir = Path(tree.dataset.id) participants_fspath = root_dir / "participants.tsv" participants = {} - with open(participants_fspath) as f: - lines = f.read().splitlines() - if lines: - participant_keys = lines[0].split("\t") - for line in lines[1:]: - dct = dict(zip(participant_keys, line.split("\t"))) - participants[dct.pop("participant_id")[len("sub-") :]] = dct - + if participants_fspath.exists(): + with open(participants_fspath) as f: + lines = f.read().splitlines() + if lines: + participant_keys = lines[0].split("\t") + for line in lines[1:]: + dct = dict(zip(participant_keys, line.split("\t"))) + participants[dct.pop("participant_id")[len("sub-") :]] = dct for subject_dir in root_dir.iterdir(): if not subject_dir.name.startswith("sub-"): continue @@ -109,8 +109,10 @@ def populate_tree(self, tree: DataTree): additional_ids = {} if any(d.name.startswith("ses-") for d in subject_dir.iterdir()): for sess_dir in subject_dir.iterdir(): - sess_id = sess_dir.name[len("ses-") :] - tree.add_leaf([subject_id, sess_id], additional_ids=additional_ids) + timepoint_id = sess_dir.name[len("ses-") :] + sess_add_ids = copy(additional_ids) + sess_add_ids["session"] = f"sub-{subject_id}_ses-{timepoint_id}" + tree.add_leaf([subject_id, timepoint_id], additional_ids=sess_add_ids) else: tree.add_leaf([subject_id], additional_ids=additional_ids) @@ -148,10 +150,9 @@ def populate_row(self, row: DataRow): or entry_fspath.name.endswith(self.PROV_SUFFIX) ): path = ( - "@" + self._fs2entry_path(entry_fspath.name) + + "@" + pipeline_dir.name - + "/" - + self._fs2entry_path(entry_fspath.name) ) # suffix = "".join(entry_fspath.suffixes) # path = path[: -len(suffix)] + "/" + suffix.lstrip(".") @@ -162,18 +163,20 @@ def populate_row(self, row: DataRow): ) def fileset_uri(self, path: str, datatype: type, row: DataRow) -> str: - if path.startswith("@"): # derivative - base_uri = "derivatives/" - path = path[1:] - else: + path, dataset_name = DataEntry.split_dataset_name_from_path(path) + if dataset_name is None: base_uri = "" + elif not dataset_name: + base_uri = f"derivatives/{Dataset.EMPTY_NAME}" + else: + base_uri = f"derivatives/{dataset_name}" return base_uri + str( self._entry2fs_path( path, - subject_id=row.ids[Clinical.subject], + subject_id=row.frequency_id("subject"), timepoint_id=( - row.ids[Clinical.timepoint] - if Clinical.timepoint in row.dataset.hierarchy + row.frequency_id("timepoint") + if "timepoint" in row.dataset.hierarchy else None ), ext=datatype.ext, @@ -181,11 +184,13 @@ def fileset_uri(self, path: str, datatype: type, row: DataRow) -> str: ) def field_uri(self, path: str, datatype: type, row: DataRow) -> str: - if path.startswith("@"): # derivative - base_uri = "derivatives/" - path = path[1:] - else: + path, dataset_name = DataEntry.split_dataset_name_from_path(path) + if dataset_name is None: base_uri = "" + elif not dataset_name: + base_uri = f"derivatives/{Dataset.EMPTY_NAME}" + else: + base_uri = f"derivatives/{dataset_name}" try: namespace, field_name = path.split("/") except ValueError: @@ -198,9 +203,9 @@ def field_uri(self, path: str, datatype: type, row: DataRow) -> str: base_uri + self._entry2fs_path( f"{namespace}/{self.FIELDS_FNAME}", - subject_id=row.ids[Clinical.subject], + subject_id=row.frequency_id("subject"), timepoint_id=( - row.ids[Clinical.timepoint] + row.frequency_id("timepoint") if Clinical.timepoint in row.dataset.hierarchy else None ), @@ -220,6 +225,7 @@ def put_fileset(self, fileset: FileSet, entry: DataEntry) -> FileSet: dest_dir=fspath.parent, stem=fspath.name[: -len(fileset.ext)], make_dirs=True, + overwrite=entry.is_derivative, ) if isinstance(copied_fileset, WithBids): # Ensure TaskName field is present in the JSON side-car if task @@ -257,103 +263,106 @@ def put_field_provenance(self, provenance: dict[str, ty.Any], entry: DataEntry): fspath, key = self._fields_prov_fspath_and_key(entry) self.update_json(fspath, key, provenance) - # Override method in base to use sub-classed metadata - # def define_dataset(self, *args, metadata=None, **kwargs): - # return super().define_dataset(*args, metadata=self._convert_metadata(metadata), **kwargs) - - # def _convert_metadata(self, metadata): - # if metadata is None: - # metadata = {} - # elif isinstance(metadata, DatasetMetadata): - # metadata = attrs.asdict(metadata) - # metadata = BidsMetadata(**metadata) - # return metadata - - ############### - # Other methods - ############### - - def create_empty_dataset( + def create_data_tree( self, id: str, - row_ids: dict[str, list[str]], - space: type = Clinical, - name: str = None, - **kwargs, + leaves: list[tuple[str, ...]], + hierarchy: list[str], + id_composition: dict[str, str] = None, + **kwargs ): root_dir = Path(id) root_dir.mkdir(parents=True) - group_ids = {} # Create sub-directories corresponding to rows of the dataset - for ids_tuple in itertools.product(*row_ids.values()): - ids = dict(zip(row_ids, ids_tuple)) - subject_id = ids["subject"] - timepoint_id = ids.get("timepoint") + group_ids = set() + subject_group_ids = {} + for ids_tuple in leaves: + ids = dict(zip(hierarchy, ids_tuple)) + # Add in composed IDs + ids.update(Dataset.decompose_ids(ids, id_composition)) + if "session" in hierarchy: + subject_id = ids["session"] + timepoint_id = None + assert "subject" not in ids + assert "timepoint" not in ids + else: + subject_id = ids["subject"] + timepoint_id = ids["timepoint"] + assert "session" not in ids group_id = ids.get("group") - if group_id is not None: - subject_id = group_id + str(subject_id) - group_ids[subject_id] = group_id + if group_id: + group_ids.add(group_id) + subject_group_ids[subject_id] = group_id sess_dir_fspath = root_dir / self._entry2fs_path( entry_path=None, subject_id=subject_id, timepoint_id=timepoint_id ) sess_dir_fspath.mkdir(parents=True) - # Save group IDs in participants TSV + # Add participants.tsv to define the groups if present if group_ids: with open(root_dir / "participants.tsv", "w") as f: f.write("participant_id\tgroup\n") - for subject_id, group_id in group_ids.items(): + for subject_id, group_id in subject_group_ids.items(): f.write(f"sub-{subject_id}\t{group_id}\n") - participants_desc = { - "group": { - "Description": "the group the participant belonged to", - "Levels": {g: f"{g} group" for g in row_ids["group"]}, - } - } - with open(root_dir / "participants.json", "w") as f: - json.dump(participants_desc, f) - dataset = self.define_dataset( - id=id, - space=space, - hierarchy=( - ["subject", "timepoint"] if "timepoint" in row_ids else ["session"] - ), - name=name, - **kwargs, - ) - dataset.save() - return dataset + + #################### + # Overrides of API # + #################### def save_dataset( - self, dataset: Dataset, name: str = None, overwrite_metadata: bool = False + self, dataset: Dataset, name: str = None, overwrite_bids_metadata: bool = False ): - super().save_dataset(dataset, name=name) - root_dir = Path(dataset.id) - participants_fspath = root_dir / "participants.tsv" - if participants_fspath.exists() and not overwrite_metadata: - logger.warning( - "Not attempting to overwrite existing BIDS dataset description at " - f"'{str(participants_fspath)}" - ) - else: - with open(participants_fspath, "w") as f: - col_names = ["participant_id"] + dataset.metadata.row_keys - if len(dataset.row_ids(Clinical.group)) > 1: - col_names.append("group") - f.write("\t".join(col_names) + "\n") - for subject_row in dataset.rows(frequency=Clinical.subject): - rw = ["sub-" + subject_row.id] + [ - subject_row.metadata[k] for k in dataset.metadata.row_keys - ] - if "group" in col_names: - rw.append(subject_row.ids[Clinical.group]) - f.write("\t".join(rw) + "\n") + self._save_metadata(dataset, overwrite_bids_metadata=overwrite_bids_metadata) + + def new_dataset( + self, + id: str, + leaves: list[tuple[str, ...]], + hierarchy: list[str] = ["session"], + space: type = Clinical, + name: str = None, + **kwargs, + ): + """Creates a new dataset with new rows to store data in + + Parameters + ---------- + id : str + ID of the dataset + leaves : list[tuple[str, ...]] + the list of tuple IDs (at each level of the tree) + name : str, optional + name of the dataset, if provided the dataset definition will be saved. To + save the dataset with the default name pass an empty string. + hierarchy : list[str], optional + hierarchy of the dataset tree, by default single level (i.e. one session per subject) + space : type, optional + the space of the dataset + Returns + ------- + Dataset + the newly created dataset + """ + dataset = super().new_dataset( + id=id, leaves=leaves, hierarchy=hierarchy, space=space, name=name, **kwargs + ) + self._save_metadata(dataset, overwrite_bids_metadata=True) + return dataset + + ################ + # Helper methods + ################ + + def _save_metadata(self, dataset: Dataset, overwrite_bids_metadata: bool = False): + root_dir = Path(dataset.id) dataset_description_fspath = root_dir / "dataset_description.json" - if dataset_description_fspath.exists() and not overwrite_metadata: + if dataset_description_fspath.exists() and not overwrite_bids_metadata: logger.warning( "Not attempting to overwrite existing BIDS dataset description at " - f"'{str(dataset_description_fspath)}" + "'%s, use 'overwrite_bids_metadata' to " + "force.", + str(dataset_description_fspath), ) else: dataset_description = map_to_bids_names( @@ -363,38 +372,69 @@ def save_dataset( with open(dataset_description_fspath, "w") as f: json.dump(dataset_description, f, indent=" ") - if dataset.metadata.readme is not None: + if dataset.metadata.description is not None: readme_path = root_dir / "README" - if readme_path.exists() and not overwrite_metadata: + if readme_path.exists() and not overwrite_bids_metadata: logger.warning( "Not attempting to overwrite existing BIDS dataset description at " - f"'{str(dataset_description_fspath)}" + "%s, use 'overwrite_bids_metadata' to " + "force.", + str(readme_path), ) else: with open(readme_path, "w") as f: - f.write(dataset.metadata.readme) - - # def load_dataset(self, id, name=None): - # from arcana.core.data.set import ( - # Dataset, - # ) # avoid circular imports it is imported here rather than at the top of the file - - ################ - # Helper methods - ################ - - def _fileset_fspath(self, entry): + f.write(dataset.metadata.description) + participants_tsv_fspath = dataset.root_dir / "participants.tsv" + columns = list(dataset.metadata.row_metadata) + group_ids = [i for i in dataset.row_ids("group") if i is not None] + if group_ids or columns: + if participants_tsv_fspath.exists() and not overwrite_bids_metadata: + logger.warning( + "Not attempting to overwrite existing BIDS participants TSV at " + "%s, use 'overwrite_bids_metadata' to " + "force.", + str(participants_tsv_fspath), + ) + else: + with open(dataset.root_dir / "participants.tsv", "w") as f: + f.write("participant_id") + if group_ids: + f.write("\tgroup") + if columns: + f.write("\t" + "\t".join(columns)) + f.write("\n") + for row in dataset.rows("subject"): + f.write( + f"sub-{row.id}" + ) + if group_ids: + f.write("\t" + row.frequency_id('group')) + if columns: + f.write("\t" + "\t".join(row.metadata[k] for k in columns)) + f.write("\n") + participants_desc = {} + if group_ids: + participants_desc["group"] = { + "Description": "the group the participant belonged to", + "Levels": {g: f"{g} group" for g in dataset.row_ids("group")}, + } + for name, desc in dataset.metadata.row_metadata.items(): + participants_desc[name] = {"Description": desc} + with open(dataset.root_dir / "participants.json", "w") as f: + json.dump(participants_desc, f) + + def _fileset_fspath(self, entry: DataEntry) -> Path: return Path(entry.row.dataset.id) / entry.uri - def _fields_fspath_and_key(self, entry): + def _fields_fspath_and_key(self, entry: DataEntry) -> tuple[Path, str]: relpath, key = entry.uri.split("::") fspath = Path(entry.row.dataset.id) / relpath return fspath, key - def _fileset_prov_fspath(self, entry): + def _fileset_prov_fspath(self, entry: DataEntry) -> Path: return self._fileset_fspath(entry).with_suffix(self.PROV_SUFFIX) - def _fields_prov_fspath_and_key(self, entry): + def _fields_prov_fspath_and_key(self, entry: DataEntry) -> tuple[Path, str]: fields_fspath, key = self._fields_fspath_and_key(entry) return fields_fspath.parent / self.FIELDS_PROV_FNAME, key @@ -439,7 +479,7 @@ def _edit_nifti_x(self, nifti_x: WithBids, entry: DataEntry): json.dump(json_dict, f) @classmethod - def _extract_entities(cls, relpath): + def _extract_entities(cls, relpath: Path) -> tuple[str, list[str], str]: relpath = Path(relpath) path = relpath.parent name_parts = relpath.name.split(".") @@ -527,13 +567,13 @@ def _entry2fs_path( return relpath @classmethod - def _rel_row_path(cls, row: DataRow): - relpath = Path(f"sub-{row.ids[Clinical.subject]}") - if Clinical.timepoint in row.dataset.hierarchy: - relpath /= f"ses-{row.ids[Clinical.timepoint]}" + def _rel_row_path(cls, row: DataRow) -> Path: + relpath = Path(f"sub-{row.frequency_id('subject')}") + if "timepoint" in row.dataset.hierarchy: + relpath /= f"ses-{row.frequency_id('timepoint')}" return relpath - def definition_save_path(self, dataset_id, name): + def definition_save_path(self, dataset_id: str, name: str) -> Path: return Path(dataset_id) / "derivatives" / name / "definition.yaml" diff --git a/arcana/bids/tasks.py b/arcana/bids/tasks.py index c0fb067..52d1b0f 100644 --- a/arcana/bids/tasks.py +++ b/arcana/bids/tasks.py @@ -136,8 +136,10 @@ def bids_app( if dataset is None: dataset = Path(tempfile.mkdtemp()) / "arcana_bids_dataset" if not isinstance(dataset, Dataset): - dataset = Bids().create_empty_dataset( - id=dataset, name=name + "_dataset", row_ids={"subject": [DEFAULT_BIDS_ID]}, + dataset = Bids().new_dataset( + id=dataset, + name=name + "_dataset", + leaves=[(DEFAULT_BIDS_ID,)], metadata={ "authors": [ f"Auto-generated by Arcana {__version__}", @@ -375,9 +377,11 @@ def extract_bids( output_paths = [] row = dataset.row(row_frequency, id) for output in outputs: - path = "@" + app_name if output.path: - path += "/" + output.path + path = output.path + else: + path = "" # whole directory + path += "@" + app_name dataset.add_sink( output.name, output.datatype, diff --git a/arcana/bids/tests/test_data.py b/arcana/bids/tests/test_data.py index cef0de7..6906859 100644 --- a/arcana/bids/tests/test_data.py +++ b/arcana/bids/tests/test_data.py @@ -2,6 +2,7 @@ import stat import typing as ty import json +import itertools from pathlib import Path from warnings import warn import requests.exceptions @@ -13,6 +14,7 @@ import docker from arcana.core import __version__ from fileformats.medimage import NiftiX, NiftiGzX, NiftiGzXBvec +from arcana.core.data.space import Clinical from arcana.bids.data import Bids from arcana.bids.tasks import bids_app, BidsInput, BidsOutput from fileformats.text import Plain as Text @@ -30,16 +32,21 @@ def test_bids_roundtrip(bids_validator_docker, bids_success_str, work_dir): dataset_name = "adataset" shutil.rmtree(path, ignore_errors=True) - dataset = Bids().create_empty_dataset( + group_ids = ["test", "control"] + member_ids = [str(i) for i in range(1, 4)] + subject_ids = ["{}{}".format(*t) for t in itertools.product(group_ids, member_ids)] + timepoint_ids = [str(i) for i in range(1, 3)] + dataset = Bids().new_dataset( id=path, name=dataset_name, - row_ids={ - "subject": [str(i) for i in range(1, 4)], - "timepoint": [str(i) for i in range(1, 3)], - "group": ["test", "control"], + space=Clinical, + hierarchy=["subject", "timepoint"], + leaves=itertools.product(subject_ids, timepoint_ids), + id_composition={ + "subject": r"(?P\w+)(?P\d+)" }, metadata={ - "readme": MOCK_README, + "description": MOCK_README, "authors": MOCK_AUTHORS, "generated_by": [ { @@ -95,7 +102,8 @@ def test_bids_roundtrip(bids_validator_docker, bids_success_str, work_dir): assert bids_success_str in result reloaded = Bids().load_dataset(id=path, name=dataset_name) - reloaded.add_sink("t1w", datatype=NiftiX, path="anat/T1w") + reloaded.add_sink("t1w", datatype=NiftiX, path="anat/T1w") # add sink to reloaded so it matches + reloaded.name = "" # remove saved name so it matches assert dataset == reloaded @@ -152,17 +160,17 @@ class JsonEditBlueprint: "fmap_mag1": SourceNiftiXBlueprint( path="fmap/magnitude1", orig_side_car={}, - edited_side_car={"IntendedFor": "func/sub-1_ses-1_task-rest_bold.nii"}, + edited_side_car={"IntendedFor": "func/sub-1_task-rest_bold.nii"}, ), "fmap_mag2": SourceNiftiXBlueprint( path="fmap/magnitude2", orig_side_car={}, - edited_side_car={"IntendedFor": "func/sub-1_ses-1_task-rest_bold.nii"}, + edited_side_car={"IntendedFor": "func/sub-1_task-rest_bold.nii"}, ), "fmap_phasediff": SourceNiftiXBlueprint( path="fmap/phasediff", orig_side_car={}, - edited_side_car={"IntendedFor": "func/sub-1_ses-1_task-rest_bold.nii"}, + edited_side_car={"IntendedFor": "func/sub-1_task-rest_bold.nii"}, ), }, ), @@ -184,15 +192,12 @@ def test_bids_json_edit(json_edit_blueprint: JsonEditBlueprint, work_dir: Path): shutil.rmtree(path, ignore_errors=True) dataset = Bids( json_edits=[(bp.path_re, bp.jq_script)], - ).create_empty_dataset( + ).new_dataset( id=path, name=name, - row_ids={ - "subject": ["1"], - "timepoint": ["1"], - }, + leaves=[("1",)], metadata={ - "readme": MOCK_README, + "description": MOCK_README, "authors": MOCK_AUTHORS, "generated_by": [ { @@ -232,12 +237,12 @@ def test_bids_json_edit(json_edit_blueprint: JsonEditBlueprint, work_dir: Path): json.dump(sf_bp.orig_side_car, f) # Get single item in dataset - dataset[sf_name][("1", "1")] = (nifti_fspath, json_fspath) + dataset[sf_name]["1"] = (nifti_fspath, json_fspath) # Check edited JSON matches reference for sf_name, sf_bp in bp.source_niftis.items(): - item = dataset[sf_name][("1", "1")] + item = dataset[sf_name]["1"] with open(item.json_file) as f: saved_dict = json.load(f) diff --git a/arcana/bids/tests/test_run_bids_app.py b/arcana/bids/tests/test_run_bids_app.py index 5c9ccef..93af4a1 100644 --- a/arcana/bids/tests/test_run_bids_app.py +++ b/arcana/bids/tests/test_run_bids_app.py @@ -2,7 +2,9 @@ from operator import mul import pytest from fileformats.text import Plain as Text -from arcana.core.data.testing import TestDatasetBlueprint +from arcana.testing.data.blueprint import ( + TestDatasetBlueprint, FileSetEntryBlueprint as FileBP +) from arcana.core.data.space import Clinical from fileformats.medimage import NiftiGzX from arcana.bids.cli import app_entrypoint @@ -19,33 +21,51 @@ def test_bids_app_entrypoint( ): blueprint = TestDatasetBlueprint( - hierarchy=[Clinical.subject, Clinical.session], + space=Clinical, + hierarchy=["subject", "session"], dim_lengths=[1, 1, 1], - files=[ - "anat/T1w.nii.gz", - "anat/T1w.json", - "anat/T2w.nii.gz", - "anat/T2w.json", - "dwi/dwi.nii.gz", - "dwi/dwi.json", - "dwi/dwi.bvec", - "dwi/dwi.bval", + entries=[ + FileBP( + path="anat/T1w", + datatype=NiftiGzX, + filenames=["anat/T1w.nii.gz", "anat/T1w.json"], + ), + FileBP( + path="anat/T2w", + datatype=NiftiGzX, + filenames=["anat/T2w.nii.gz", "anat/T2w.json"], + ), + FileBP( + "dwi/dwi", + datatype=NiftiGzX, + filenames=[ + "dwi/dwi.nii.gz", + "dwi/dwi.json", + "dwi/dwi.bvec", + "dwi/dwi.bval", + ], + ), ], - expected_datatypes={ - "anat/T1w": (NiftiGzX, ["T1w.nii.gz", "T1w.json"]), - "anat/T2w": (NiftiGzX, ["T2w.nii.gz", "T2w.json"]), - "dwi/dwi": (NiftiGzX, ["dwi.nii.gz", "dwi.json", "dwi.bvec", "dwi.bval"]), - }, derivatives=[ - ("file1", Clinical.session, Text, ["file1.txt"]), - ("file2", Clinical.session, Text, ["file2.txt"]), + FileBP( + path="file1", + row_frequency=Clinical.session, + datatype=Text, + filenames=["file1.txt"], + ), + FileBP( + path="file2", + row_frequency=Clinical.session, + datatype=Text, + filenames=["file2.txt"], + ), ], ) dataset_path = work_dir / "bids-dataset" - dataset = Bids.make_test_dataset( - dataset_id=dataset_path, blueprint=blueprint, source_data=nifti_sample_dir + dataset = blueprint.make_dataset( + dataset_id=dataset_path, store=Bids(), source_data=nifti_sample_dir ) spec_path = work_dir / "spec.yaml" @@ -64,7 +84,7 @@ def test_bids_app_entrypoint( "--spec-path", spec_path, "--dataset-hierarchy", - ",".join([str(ln) for ln in blueprint.hierarchy]), + ",".join(blueprint.hierarchy), ] inputs_config = {} for path, (datatype, _) in blueprint.expected_datatypes.items(): @@ -106,9 +126,7 @@ def test_bids_app_entrypoint( "executable": str(mock_bids_app_executable), }, }, - packages={ - "pip": ["arcana-bids"] - } + packages={"pip": ["arcana-bids"]}, ) image_spec.save(spec_path)