diff --git a/xcp_d/config.py b/xcp_d/config.py index 306345773..f02f926c1 100644 --- a/xcp_d/config.py +++ b/xcp_d/config.py @@ -524,6 +524,9 @@ def _process_value(value): "preprocessed": cls.fmri_dir, "templateflow": Path(TF_LAYOUT.root), } + if cls.atlases: + dataset_links["atlas"] = cls.output_dir / "atlases" + for dset_name, dset_path in cls.datasets.items(): dataset_links[dset_name] = dset_path cls.dataset_links = dataset_links diff --git a/xcp_d/interfaces/ants.py b/xcp_d/interfaces/ants.py index 86f075790..ccd6352af 100644 --- a/xcp_d/interfaces/ants.py +++ b/xcp_d/interfaces/ants.py @@ -25,7 +25,7 @@ class _ConvertTransformFileInputSpec(CommandLineInputSpec): - dimension = traits.Enum((3, 2), default=3, usedefault=True, argstr="%d", position=0) + dimension = traits.Enum(3, 2, usedefault=True, argstr="%d", position=0) in_transform = traits.File(exists=True, argstr="%s", mandatory=True, position=1) out_transform = traits.File( argstr="%s", diff --git a/xcp_d/interfaces/bids.py b/xcp_d/interfaces/bids.py index eaba8e81d..ee5ef316e 100644 --- a/xcp_d/interfaces/bids.py +++ b/xcp_d/interfaces/bids.py @@ -11,15 +11,17 @@ from nipype.interfaces.base import ( BaseInterfaceInputSpec, Directory, + DynamicTraitedSpec, File, SimpleInterface, TraitedSpec, traits, ) +from nipype.interfaces.io import add_traits from niworkflows.interfaces.bids import DerivativesDataSink as BaseDerivativesDataSink from xcp_d.data import load as load_data -from xcp_d.utils.bids import get_entity +from xcp_d.utils.bids import _get_bidsuris, get_entity # NOTE: Modified for xcpd's purposes xcp_d_spec = loads(load_data("xcp_d_bids_config.json").read_text()) @@ -193,6 +195,11 @@ class _CopyAtlasInputSpec(BaseInterfaceInputSpec): desc="The atlas name.", mandatory=True, ) + Sources = traits.List( + traits.Str, + desc="List of sources for the atlas.", + mandatory=False, + ) class _CopyAtlasOutputSpec(TraitedSpec): @@ -243,6 +250,7 @@ def _run_interface(self, runtime): meta_dict = self.inputs.meta_dict name_source = self.inputs.name_source atlas = self.inputs.atlas + Sources = self.inputs.Sources atlas_out_dir = os.path.join(output_dir, f"atlases/atlas-{atlas}") @@ -284,11 +292,69 @@ def _run_interface(self, runtime): shutil.copyfile(in_file, out_file) # Only write out a sidecar if metadata are provided - if meta_dict: + if meta_dict or Sources: meta_file = os.path.join(atlas_out_dir, f"{out_basename}.json") + meta_dict = meta_dict or {} + meta_dict = meta_dict.copy() + if Sources: + meta_dict["Sources"] = meta_dict.get("Sources", []) + Sources + with open(meta_file, "w") as fo: dump(meta_dict, fo, sort_keys=True, indent=4) self._results["out_file"] = out_file return runtime + + +class _BIDSURIInputSpec(DynamicTraitedSpec): + dataset_links = traits.Dict(mandatory=True, desc="Dataset links") + out_dir = traits.Str(mandatory=True, desc="Output directory") + metadata = traits.Dict(desc="Metadata dictionary") + field = traits.Str( + "Sources", + usedefault=True, + desc="Field to use for BIDS URIs in metadata dict", + ) + + +class _BIDSURIOutputSpec(TraitedSpec): + out = traits.List( + traits.Str, + desc="BIDS URI(s) for file", + ) + metadata = traits.Dict( + desc="Dictionary with 'Sources' field.", + ) + + +class BIDSURI(SimpleInterface): + """Convert input filenames to BIDS URIs, based on links in the dataset. + + This interface can combine multiple lists of inputs. + """ + + input_spec = _BIDSURIInputSpec + output_spec = _BIDSURIOutputSpec + + def __init__(self, numinputs=0, **inputs): + super().__init__(**inputs) + self._numinputs = numinputs + if numinputs >= 1: + input_names = [f"in{i + 1}" for i in range(numinputs)] + else: + input_names = [] + add_traits(self.inputs, input_names) + + def _run_interface(self, runtime): + inputs = [getattr(self.inputs, f"in{i + 1}") for i in range(self._numinputs)] + uris = _get_bidsuris(inputs, self.inputs.dataset_links, self.inputs.out_dir) + self._results["out"] = uris + + # Add the URIs to the metadata dictionary. + metadata = self.inputs.metadata or {} + metadata = metadata.copy() + metadata[self.inputs.field] = metadata.get(self.inputs.field, []) + uris + self._results["metadata"] = metadata + + return runtime diff --git a/xcp_d/interfaces/censoring.py b/xcp_d/interfaces/censoring.py index c76d84186..5a5e0030e 100644 --- a/xcp_d/interfaces/censoring.py +++ b/xcp_d/interfaces/censoring.py @@ -215,7 +215,7 @@ class _CensorInputSpec(BaseInterfaceInputSpec): ), ) column = traits.Str( - default="framewise_displacement", + "framewise_displacement", usedefault=True, mandatory=False, desc="Column name in the temporal mask to use for censoring.", @@ -404,11 +404,12 @@ def _run_interface(self, runtime): class _ProcessMotionInputSpec(BaseInterfaceInputSpec): TR = traits.Float(mandatory=True, desc="Repetition time in seconds") fd_thresh = traits.Float( + 0.3, mandatory=False, - default_value=0.3, + usedefault=True, desc="Framewise displacement threshold. All values above this will be dropped.", ) - head_radius = traits.Float(mandatory=False, default_value=50, desc="Head radius in mm ") + head_radius = traits.Float(50, mandatory=False, usedefault=True, desc="Head radius in mm") motion_file = File( exists=True, mandatory=True, @@ -494,6 +495,11 @@ def _run_interface(self, runtime): filtered=False, ) fd_timeseries = motion_df["framewise_displacement"].to_numpy() + motion_metadata["framewise_displacement"] = { + "Description": "Framewise displacement calculated according to Power et al. (2012).", + "HeadRadius": self.inputs.head_radius, + "Units": "mm", + } if self.inputs.motion_filter_type: motion_df["framewise_displacement_filtered"] = compute_fd( confound=motion_df, @@ -504,18 +510,14 @@ def _run_interface(self, runtime): # Compile motion metadata from confounds metadata, adding in filtering info # First drop any columns that are not motion parameters - orig_motion_df = pd.read_table(self.inputs.motion_file) - orig_motion_cols = orig_motion_df.columns.tolist() - cols_to_drop = sorted(list(set(orig_motion_cols) - set(motion_df.columns.tolist()))) + orig_cols = list(motion_metadata.keys()) + orig_cols = [c for c in orig_cols if c[0] == c[0].lower()] + cols_to_drop = sorted(list(set(orig_cols) - set(motion_df.columns.tolist()))) motion_metadata = {k: v for k, v in motion_metadata.items() if k not in cols_to_drop} for col in motion_df.columns.tolist(): col_metadata = motion_metadata.get(col, {}) - if col.startswith("framewise_displacement"): - col_metadata["Description"] = ( - "Framewise displacement calculated according to Power et al. (2012)." - ) - col_metadata["Units"] = "mm" - col_metadata["HeadRadius"] = self.inputs.head_radius + if col.endswith("_filtered") and col[:-9] in motion_metadata: + col_metadata = motion_metadata[col[:-9]] if self.inputs.motion_filter_type == "lp" and col.endswith("_filtered"): filters = col_metadata.get("SoftwareFilters", {}) @@ -694,7 +696,7 @@ def _run_interface(self, runtime): import nibabel as nb import pandas as pd - from xcp_d.utils.bids import make_bids_uri + from xcp_d.utils.bids import _get_bidsuris from xcp_d.utils.confounds import filter_motion, volterra in_img = nb.load(self.inputs.in_file) @@ -752,7 +754,7 @@ def _run_interface(self, runtime): confounds_metadata[found_column] = confounds_metadata.get( found_column, {} ) - confounds_metadata[found_column]["Sources"] = make_bids_uri( + confounds_metadata[found_column]["Sources"] = _get_bidsuris( in_files=[confound_file], dataset_links=self.inputs.dataset_links, out_dir=self.inputs.out_dir, @@ -772,19 +774,12 @@ def _run_interface(self, runtime): new_confound_df.fillna({column: 0}, inplace=True) confounds_metadata[column] = confounds_metadata.get(column, {}) - confounds_metadata[column]["Sources"] = make_bids_uri( + confounds_metadata[column]["Sources"] = _get_bidsuris( in_files=[confound_file], dataset_links=self.inputs.dataset_links, out_dir=self.inputs.out_dir, ) - # Collect column metadata - for column in new_confound_df.columns: - if column in confound_metadata: - confounds_metadata[column] = confound_metadata[column] - else: - confounds_metadata[column] = {} - else: # Voxelwise confounds confound_img = nb.load(confound_file) if confound_img.ndim == 2: # CIFTI @@ -804,7 +799,7 @@ def _run_interface(self, runtime): # Collect image metadata new_confound_df.loc[:, confound_name] = np.nan # fill with NaNs as a placeholder confounds_metadata[confound_name] = confound_metadata - confounds_metadata[confound_name]["Sources"] = make_bids_uri( + confounds_metadata[confound_name]["Sources"] = _get_bidsuris( in_files=[confound_file], dataset_links=self.inputs.dataset_links, out_dir=self.inputs.out_dir, @@ -815,7 +810,7 @@ def _run_interface(self, runtime): ) # This actually gets overwritten in init_postproc_derivatives_wf. - confounds_metadata["Sources"] = make_bids_uri( + confounds_metadata["Sources"] = _get_bidsuris( in_files=confound_files, dataset_links=self.inputs.dataset_links, out_dir=self.inputs.out_dir, diff --git a/xcp_d/interfaces/connectivity.py b/xcp_d/interfaces/connectivity.py index 430a597b7..2717b5ff2 100644 --- a/xcp_d/interfaces/connectivity.py +++ b/xcp_d/interfaces/connectivity.py @@ -31,7 +31,7 @@ class _NiftiParcellateInputSpec(BaseInterfaceInputSpec): atlas = File(exists=True, mandatory=True, desc="atlas file") atlas_labels = File(exists=True, mandatory=True, desc="atlas labels file") min_coverage = traits.Float( - default=0.5, + 0.5, usedefault=True, desc=( "Coverage threshold to apply to parcels. " diff --git a/xcp_d/interfaces/nilearn.py b/xcp_d/interfaces/nilearn.py index d91796266..a6d9b9745 100644 --- a/xcp_d/interfaces/nilearn.py +++ b/xcp_d/interfaces/nilearn.py @@ -261,9 +261,9 @@ class _DenoiseImageInputSpec(BaseInterfaceInputSpec): ) TR = traits.Float(mandatory=True, desc="Repetition time") bandpass_filter = traits.Bool(mandatory=True, desc="To apply bandpass or not") - low_pass = traits.Float(mandatory=True, default_value=0.10, desc="Lowpass filter in Hz") - high_pass = traits.Float(mandatory=True, default_value=0.01, desc="Highpass filter in Hz") - filter_order = traits.Int(mandatory=True, default_value=2, desc="Filter order") + low_pass = traits.Float(mandatory=True, desc="Lowpass filter in Hz") + high_pass = traits.Float(mandatory=True, desc="Highpass filter in Hz") + filter_order = traits.Int(mandatory=True, desc="Filter order") class _DenoiseImageOutputSpec(TraitedSpec): diff --git a/xcp_d/interfaces/plotting.py b/xcp_d/interfaces/plotting.py index d7f4dfcc6..1a2492576 100644 --- a/xcp_d/interfaces/plotting.py +++ b/xcp_d/interfaces/plotting.py @@ -368,7 +368,7 @@ class _QCPlotsESInputSpec(BaseInterfaceInputSpec): Undefined, desc="TSV file with temporal mask.", ) - TR = traits.Float(default_value=1, desc="Repetition time") + TR = traits.Float(1, usedefault=True, desc="Repetition time") standardize = traits.Bool( mandatory=True, desc=( @@ -529,7 +529,6 @@ class _SlicesDirInputSpec(FSLCommandInputSpec): out_extension = traits.Enum( (".gif", ".png", ".svg"), - default=".gif", usedefault=True, desc="Convenience parameter to let xcp_d select the extension.", ) @@ -670,27 +669,27 @@ class _PlotCiftiParcellationInputSpec(BaseInterfaceInputSpec): desc="Labels for the CIFTI files.", ) out_file = File( + "plot.svg", exists=False, mandatory=False, desc="Output file.", - default="plot.svg", usedefault=True, ) vmin = traits.Float( + 0, mandatory=False, - default_value=0, usedefault=True, desc="Minimum value for the colormap.", ) vmax = traits.Float( + 0, mandatory=False, - default_value=0, usedefault=True, desc="Maximum value for the colormap.", ) base_desc = traits.Str( + "", mandatory=False, - default_value="", usedefault=True, desc="Base description for the output file.", ) @@ -910,8 +909,8 @@ class _PlotDenseCiftiInputSpec(BaseInterfaceInputSpec): desc="CIFTI file to plot.", ) base_desc = traits.Str( + "", mandatory=False, - default_value="", usedefault=True, desc="Base description for the output file.", ) diff --git a/xcp_d/interfaces/workbench.py b/xcp_d/interfaces/workbench.py index 98747845d..557c5773a 100644 --- a/xcp_d/interfaces/workbench.py +++ b/xcp_d/interfaces/workbench.py @@ -622,7 +622,7 @@ class _CiftiParcellateWorkbenchInputSpec(_WBCommandInputSpec): "MODE", "COUNT_NONZERO", position=12, - default="MEAN", + usedefault=True, argstr="-method %s", desc="Specify method of parcellation (default MEAN, or MODE if label data)", ) @@ -709,7 +709,7 @@ class _CiftiSurfaceResampleInputSpec(_WBCommandInputSpec): "The BARYCENTRIC method is generally recommended for anatomical surfaces, " "in order to minimize smoothing." ), - default="BARYCENTRIC", + usedefault=True, ) out_file = File( name_source=["in_file"], diff --git a/xcp_d/tests/data/test_fmriprep_without_freesurfer_outputs.txt b/xcp_d/tests/data/test_fmriprep_without_freesurfer_outputs.txt index 158582ae5..52bf2930f 100644 --- a/xcp_d/tests/data/test_fmriprep_without_freesurfer_outputs.txt +++ b/xcp_d/tests/data/test_fmriprep_without_freesurfer_outputs.txt @@ -5,6 +5,7 @@ atlases/atlas-4S156Parcels/atlas-4S156Parcels_space-MNI152NLin2009cAsym_dseg.jso atlases/atlas-4S156Parcels/atlas-4S156Parcels_space-MNI152NLin2009cAsym_dseg.nii.gz atlases/atlas-Schaefer100 atlases/atlas-Schaefer100/atlas-Schaefer100_dseg.tsv +atlases/atlas-Schaefer100/atlas-Schaefer100_space-MNI152NLin2009cAsym_dseg.json atlases/atlas-Schaefer100/atlas-Schaefer100_space-MNI152NLin2009cAsym_dseg.nii.gz atlases/dataset_description.json dataset_description.json diff --git a/xcp_d/tests/test_utils_bids.py b/xcp_d/tests/test_utils_bids.py index 8a2c80efc..ab72ad610 100644 --- a/xcp_d/tests/test_utils_bids.py +++ b/xcp_d/tests/test_utils_bids.py @@ -468,77 +468,3 @@ def test_group_across_runs(): "/path/sub-01_task-rest_dir-LR_run-2_bold.nii.gz", "/path/sub-01_task-rest_dir-RL_run-2_bold.nii.gz", ] - - -def test_make_uri(): - """Test _make_uri.""" - in_file = "/path/to/dset/sub-01/func/sub-01_task-rest_bold.nii.gz" - dataset_name = "test" - dataset_path = "/path/to/dset" - uri = xbids._make_uri(in_file, dataset_name=dataset_name, dataset_path=dataset_path) - assert uri == "bids:test:sub-01/func/sub-01_task-rest_bold.nii.gz" - - dataset_path = "/another/path/haha" - with pytest.raises(ValueError, match="is not in the subpath of"): - xbids._make_uri(in_file, dataset_name=dataset_name, dataset_path=dataset_path) - - -def test_make_xcpd_uri(): - """Test _make_xcpd_uri.""" - out_file = "/path/to/dset/xcp_d/sub-01/func/sub-01_task-rest_bold.nii.gz" - uri = xbids._make_xcpd_uri(out_file, output_dir="/path/to/dset/xcp_d") - assert uri == ["bids::sub-01/func/sub-01_task-rest_bold.nii.gz"] - - xbids._make_xcpd_uri([out_file], output_dir="/path/to/dset/xcp_d") - assert uri == ["bids::sub-01/func/sub-01_task-rest_bold.nii.gz"] - - -def test_make_atlas_uri(): - """Test _make_atlas_uri.""" - out_file = "/path/to/dset/xcp_d/atlases/sub-01/func/sub-01_task-rest_bold.nii.gz" - uri = xbids._make_atlas_uri(out_file, output_dir="/path/to/dset/xcp_d") - assert uri == ["bids:atlas:sub-01/func/sub-01_task-rest_bold.nii.gz"] - - xbids._make_atlas_uri([out_file], output_dir="/path/to/dset/xcp_d") - assert uri == ["bids:atlas:sub-01/func/sub-01_task-rest_bold.nii.gz"] - - -def test_make_xcpd_uri_lol(): - """Test _make_xcpd_uri_lol.""" - in_list = [ - [ - "/path/to/dset/xcp_d/sub-01/func/sub-01_task-rest_run-1_bold.nii.gz", - "/path/to/dset/xcp_d/sub-02/func/sub-01_task-rest_run-1_bold.nii.gz", - "/path/to/dset/xcp_d/sub-03/func/sub-01_task-rest_run-1_bold.nii.gz", - ], - [ - "/path/to/dset/xcp_d/sub-01/func/sub-01_task-rest_run-2_bold.nii.gz", - "/path/to/dset/xcp_d/sub-02/func/sub-01_task-rest_run-2_bold.nii.gz", - "/path/to/dset/xcp_d/sub-03/func/sub-01_task-rest_run-2_bold.nii.gz", - ], - ] - uris = xbids._make_xcpd_uri_lol(in_list, output_dir="/path/to/dset/xcp_d") - assert uris == [ - [ - "bids::sub-01/func/sub-01_task-rest_run-1_bold.nii.gz", - "bids::sub-01/func/sub-01_task-rest_run-2_bold.nii.gz", - ], - [ - "bids::sub-02/func/sub-01_task-rest_run-1_bold.nii.gz", - "bids::sub-02/func/sub-01_task-rest_run-2_bold.nii.gz", - ], - [ - "bids::sub-03/func/sub-01_task-rest_run-1_bold.nii.gz", - "bids::sub-03/func/sub-01_task-rest_run-2_bold.nii.gz", - ], - ] - - -def test_make_preproc_uri(): - """Test _make_preproc_uri.""" - out_file = "/path/to/dset/sub-01/func/sub-01_task-rest_bold.nii.gz" - uri = xbids._make_preproc_uri(out_file, fmri_dir="/path/to/dset") - assert uri == ["bids:preprocessed:sub-01/func/sub-01_task-rest_bold.nii.gz"] - - xbids._make_preproc_uri([out_file], fmri_dir="/path/to/dset") - assert uri == ["bids:preprocessed:sub-01/func/sub-01_task-rest_bold.nii.gz"] diff --git a/xcp_d/tests/test_utils_utils.py b/xcp_d/tests/test_utils_utils.py index 26bd24b7b..3a165432f 100644 --- a/xcp_d/tests/test_utils_utils.py +++ b/xcp_d/tests/test_utils_utils.py @@ -529,62 +529,6 @@ def test_select_first(): assert utils._select_first(lst) == "a" -def test_listify(): - """Test _listify.""" - inputs = [ - 1, - (1,), - "a", - ["a"], - ["a", ["b", "c"]], - ("a", "b"), - ] - outputs = [ - [1], - (1,), - ["a"], - ["a"], - ["a", ["b", "c"]], - ("a", "b"), - ] - for i, input_ in enumerate(inputs): - expected_output = outputs[i] - output = utils._listify(input_) - assert output == expected_output - - -def test_make_dictionary(): - """Test _make_dictionary.""" - metadata = {"Sources": ["a"]} - out_metadata = utils._make_dictionary(metadata, Sources=["b"]) - # Ensure the original dictionary isn't modified. - assert metadata["Sources"] == ["a"] - assert out_metadata["Sources"] == ["a", "b"] - - metadata = {"Test": "a"} - out_metadata = utils._make_dictionary(metadata, Sources=["b"]) - assert out_metadata["Sources"] == ["b"] - - metadata = {"Test": ["a"]} - out_metadata = utils._make_dictionary(metadata, Sources="b") - assert out_metadata["Sources"] == "b" - - metadata = {"Sources": "a"} - out_metadata = utils._make_dictionary(metadata, Sources=["b"]) - # Ensure the original dictionary isn't modified. - assert metadata["Sources"] == "a" - assert out_metadata["Sources"] == ["a", "b"] - - metadata = {"Sources": ["a"]} - out_metadata = utils._make_dictionary(metadata, Sources="b") - # Ensure the original dictionary isn't modified. - assert metadata["Sources"] == ["a"] - assert out_metadata["Sources"] == ["a", "b"] - - out_metadata = utils._make_dictionary(metadata=None, Sources=["b"]) - assert out_metadata["Sources"] == ["b"] - - def test_transpose_lol(): """Test _transpose_lol.""" inputs = [ diff --git a/xcp_d/utils/bids.py b/xcp_d/utils/bids.py index cd11d8274..18a5c7187 100644 --- a/xcp_d/utils/bids.py +++ b/xcp_d/utils/bids.py @@ -13,7 +13,10 @@ import nibabel as nb import yaml from bids.layout import BIDSLayout +from bids.utils import listify from nipype import logging +from nipype.interfaces.base import isdefined +from nipype.interfaces.utility.base import _ravel from packaging.version import Version from xcp_d.data import load as load_data @@ -760,8 +763,6 @@ def write_derivative_description( Path to the output XCP-D dataset. atlases : :obj:`list` of :obj:`str`, optional Names of requested XCP-D atlases. - custom_confounds_folder : :obj:`str`, optional - Path to the folder containing custom confounds files. dataset_links : :obj:`dict`, optional Dictionary of dataset links to include in the dataset description. """ @@ -1046,12 +1047,47 @@ def group_across_runs(in_files): return out_files +def check_pipeline_version(pipeline_name, cvers, data_desc): + """Search for existing BIDS pipeline output and compares against current pipeline version. + + Parameters + ---------- + cvers : :obj:`str` + Current pipeline version + data_desc : :obj:`str` or :obj:`os.PathLike` + Path to pipeline output's ``dataset_description.json`` + + Returns + ------- + message : :obj:`str` or :obj:`None` + A warning string if there is a difference between versions, otherwise ``None``. + + """ + import json + + data_desc = Path(data_desc) + if not data_desc.exists(): + return + + desc = json.loads(data_desc.read_text()) + generators = { + generator["Name"]: generator.get("Version", "0+unknown") + for generator in desc.get("GeneratedBy", []) + } + dvers = generators.get(pipeline_name) + if dvers is None: + # Very old style + dvers = desc.get("PipelineDescription", {}).get("Version", "0+unknown") + + if Version(cvers).public != Version(dvers).public: + return f"Previous output generated by version {dvers} found." + + def _find_nearest_path(path_dict, input_path): """Find the nearest relative path from an input path to a dictionary of paths. If ``input_path`` is not relative to any of the paths in ``path_dict``, the absolute path string is returned. - If ``input_path`` is already a BIDS-URI, then it will be returned unmodified. Parameters @@ -1111,107 +1147,15 @@ def _find_nearest_path(path_dict, input_path): return matching_path -def make_bids_uri(in_files, dataset_links, out_dir): - """Create a BIDS-URI for each input file.""" +def _get_bidsuris(in_files, dataset_links, out_dir): + """Convert input paths to BIDS-URIs using a dictionary of dataset links.""" + in_files = listify(in_files) + in_files = _ravel(in_files) + # Remove undefined inputs + in_files = [f for f in in_files if isdefined(f)] # Convert the dataset links to BIDS URI prefixes updated_keys = {f"bids:{k}:": Path(v) for k, v in dataset_links.items()} updated_keys["bids::"] = Path(out_dir) # Convert the paths to BIDS URIs - bids_uris = [_find_nearest_path(updated_keys, f) for f in in_files] - return bids_uris - - -def _make_uri(in_file, dataset_name, dataset_path): - """Convert a filename to a BIDS URI. - - Raises - ------ - ValueError - If ``in_file`` is not relative to ``dataset_path``. - """ - bids_uri = f"bids:{dataset_name}:{str(Path(in_file).relative_to(dataset_path))}" - return bids_uri - - -def _make_xcpd_uri(out_file, output_dir): - """Convert postprocessing derivative's path to BIDS URI.""" - from xcp_d.utils.bids import _make_uri - - if isinstance(out_file, list): - return [_make_uri(of, "", output_dir) for of in out_file] - else: - return [_make_uri(out_file, "", output_dir)] - - -def _make_xcpd_uri_lol(in_list, output_dir): - """Call _make_xcpd_uri on a list of lists and then transpose the result.""" - from xcp_d.utils.bids import _make_xcpd_uri - from xcp_d.utils.utils import _transpose_lol - - out = [] - for sublist in in_list: - sublist_out = _make_xcpd_uri(sublist, output_dir) - out.append(sublist_out) - - out_lol = _transpose_lol(out) - return out_lol - - -def _make_atlas_uri(out_file, output_dir): - """Convert postprocessing atlas derivative's path to BIDS URI.""" - import os - - from xcp_d.utils.bids import _make_uri - - dataset_path = os.path.join(output_dir, "atlases") - - if isinstance(out_file, list): - return [_make_uri(of, "atlas", dataset_path) for of in out_file] - else: - return [_make_uri(out_file, "atlas", dataset_path)] - - -def _make_preproc_uri(out_file, fmri_dir): - """Convert preprocessing derivative's path to BIDS URI.""" - from xcp_d.utils.bids import _make_uri - - if isinstance(out_file, list): - return [_make_uri(of, "preprocessed", fmri_dir) for of in out_file] - else: - return [_make_uri(out_file, "preprocessed", fmri_dir)] - - -def check_pipeline_version(pipeline_name, cvers, data_desc): - """Search for existing BIDS pipeline output and compares against current pipeline version. - - Parameters - ---------- - cvers : :obj:`str` - Current pipeline version - data_desc : :obj:`str` or :obj:`os.PathLike` - Path to pipeline output's ``dataset_description.json`` - - Returns - ------- - message : :obj:`str` or :obj:`None` - A warning string if there is a difference between versions, otherwise ``None``. - - """ - import json - - data_desc = Path(data_desc) - if not data_desc.exists(): - return - - desc = json.loads(data_desc.read_text()) - generators = { - generator["Name"]: generator.get("Version", "0+unknown") - for generator in desc.get("GeneratedBy", []) - } - dvers = generators.get(pipeline_name) - if dvers is None: - # Very old style - dvers = desc.get("PipelineDescription", {}).get("Version", "0+unknown") - - if Version(cvers).public != Version(dvers).public: - return f"Previous output generated by version {dvers} found." + out = [_find_nearest_path(updated_keys, f) for f in in_files] + return out diff --git a/xcp_d/utils/utils.py b/xcp_d/utils/utils.py index 0446a7a9c..931368612 100644 --- a/xcp_d/utils/utils.py +++ b/xcp_d/utils/utils.py @@ -574,41 +574,6 @@ def list_to_str(lst): return f"{', '.join(lst_str[:-1])}, and {lst_str[-1]}" -def _listify(obj): - """Wrap all non-list or tuple objects in a list. - - This provides a simple way to accept flexible arguments. - """ - return obj if isinstance(obj, (list, tuple, type(None), np.ndarray)) else [obj] - - -def _make_dictionary(metadata=None, **kwargs): - """Create or modify a dictionary. - - This will add kwargs to a metadata dictionary if the dictionary is provided, - or create a dictionary from scratch if not. - """ - from copy import deepcopy - - from xcp_d.utils.utils import _listify - - if metadata: - out_metadata = deepcopy(metadata) - for key, value in kwargs.items(): - if key not in metadata.keys(): - out_metadata[key] = value - elif isinstance(value, list) or isinstance(out_metadata[key], list): - # Append the values if they're a list - out_metadata[key] = _listify(out_metadata[key]) + _listify(value) - else: - # Overwrite the old value - out_metadata[key] = value - - return out_metadata - else: - return dict(kwargs) - - def _transpose_lol(lol): """Transpose list of lists.""" return list(map(list, zip(*lol))) diff --git a/xcp_d/workflows/bold/concatenation.py b/xcp_d/workflows/bold/concatenation.py index cca941850..4d750958f 100644 --- a/xcp_d/workflows/bold/concatenation.py +++ b/xcp_d/workflows/bold/concatenation.py @@ -5,16 +5,15 @@ from niworkflows.engine.workflows import LiterateWorkflow as Workflow from xcp_d import config -from xcp_d.interfaces.bids import DerivativesDataSink +from xcp_d.interfaces.bids import BIDSURI, DerivativesDataSink from xcp_d.interfaces.concatenation import ( CleanNameSource, ConcatenateInputs, FilterOutFailedRuns, ) from xcp_d.interfaces.connectivity import TSVConnect -from xcp_d.utils.bids import _make_xcpd_uri, _make_xcpd_uri_lol from xcp_d.utils.doc import fill_doc -from xcp_d.utils.utils import _make_dictionary, _select_first +from xcp_d.utils.utils import _select_first, _transpose_lol from xcp_d.workflows.bold.plotting import init_qc_report_wf @@ -183,6 +182,17 @@ def init_concatenate_data_wf(TR, head_radius, name="concatenate_data_wf"): ]), ]) # fmt:skip + motion_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="motion_src", + run_without_submitting=True, + ) + workflow.connect([(filter_runs, motion_src, [("motion_file", "in1")])]) + ds_motion_file = pe.Node( DerivativesDataSink( dismiss_entities=["segmentation", "den", "res", "space", "cohort", "desc"], @@ -196,10 +206,21 @@ def init_concatenate_data_wf(TR, head_radius, name="concatenate_data_wf"): workflow.connect([ (clean_name_source, ds_motion_file, [("name_source", "source_file")]), (concatenate_inputs, ds_motion_file, [("motion_file", "in_file")]), - (filter_runs, ds_motion_file, [(("motion_file", _make_xcpd_uri, output_dir), "Sources")]), + (motion_src, ds_motion_file, [("out", "Sources")]), ]) # fmt:skip if fd_thresh > 0: + temporal_mask_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="temporal_mask_src", + run_without_submitting=True, + ) + workflow.connect([(filter_runs, temporal_mask_src, [("temporal_mask", "in1")])]) + ds_temporal_mask = pe.Node( DerivativesDataSink( dismiss_entities=["segmentation", "den", "res", "space", "cohort", "desc"], @@ -210,13 +231,10 @@ def init_concatenate_data_wf(TR, head_radius, name="concatenate_data_wf"): run_without_submitting=True, mem_gb=1, ) - workflow.connect([ (clean_name_source, ds_temporal_mask, [("name_source", "source_file")]), (concatenate_inputs, ds_temporal_mask, [("temporal_mask", "in_file")]), - (filter_runs, ds_temporal_mask, [ - (("temporal_mask", _make_xcpd_uri, output_dir), "Sources"), - ]), + (temporal_mask_src, ds_temporal_mask, [("out", "Sources")]), ]) # fmt:skip if file_format == "cifti": @@ -269,42 +287,57 @@ def init_concatenate_data_wf(TR, head_radius, name="concatenate_data_wf"): mem_gb=2, ) + denoised_bold_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="denoised_bold_src", + run_without_submitting=True, + ) + workflow.connect([(filter_runs, denoised_bold_src, [("denoised_bold", "in1")])]) + workflow.connect([ (clean_name_source, ds_denoised_bold, [("name_source", "source_file")]), (concatenate_inputs, ds_denoised_bold, [("denoised_bold", "in_file")]), - (filter_runs, ds_denoised_bold, [ - (("denoised_bold", _make_xcpd_uri, output_dir), "Sources"), - ]), + (denoised_bold_src, ds_denoised_bold, [("out", "Sources")]), ]) # fmt:skip if smoothing: + smoothed_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="smoothed_src", + run_without_submitting=True, + ) workflow.connect([ + (filter_runs, smoothed_src, [("smoothed_denoised_bold", "in1")]), (clean_name_source, ds_smoothed_denoised_bold, [("name_source", "source_file")]), (concatenate_inputs, ds_smoothed_denoised_bold, [ ("smoothed_denoised_bold", "in_file"), ]), - (filter_runs, ds_smoothed_denoised_bold, [ - (("smoothed_denoised_bold", _make_xcpd_uri, output_dir), "Sources"), - ]), + (smoothed_src, ds_smoothed_denoised_bold, [("out", "Sources")]), ]) # fmt:skip # Functional connectivity outputs if atlases: make_timeseries_dict = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, name="make_timeseries_dict", - iterfield=["Sources"], + iterfield=["in1"], ) workflow.connect([ - (filter_runs, make_timeseries_dict, [ - (("timeseries", _make_xcpd_uri_lol, output_dir), "Sources"), - ]), + (filter_runs, make_timeseries_dict, [(("timeseries", _transpose_lol), "in1")]), ]) # fmt:skip ds_timeseries = pe.MapNode( @@ -344,21 +377,17 @@ def init_concatenate_data_wf(TR, head_radius, name="concatenate_data_wf"): ]) # fmt:skip make_correlations_dict = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, name="make_correlations_dict", - iterfield=["Sources"], + iterfield=["in1"], ) - workflow.connect([ - (ds_timeseries, make_correlations_dict, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), - ]) # fmt:skip + workflow.connect([(ds_timeseries, make_correlations_dict, [("out_file", "in1")])]) ds_correlations = pe.MapNode( DerivativesDataSink( @@ -381,24 +410,22 @@ def init_concatenate_data_wf(TR, head_radius, name="concatenate_data_wf"): ]) # fmt:skip if file_format == "cifti": - make_timeseries_ciftis_dict = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["Sources"], - output_names=["metadata"], + cifti_ts_src = pe.MapNode( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, - name="make_timeseries_ciftis_dict", - iterfield=["Sources"], + name="cifti_ts_src", + iterfield=["in1"], ) workflow.connect([ - (filter_runs, make_timeseries_ciftis_dict, [ - (("timeseries_ciftis", _make_xcpd_uri_lol, output_dir), "Sources"), - ]), + (filter_runs, cifti_ts_src, [(("timeseries_ciftis", _transpose_lol), "in1")]), ]) # fmt:skip - ds_timeseries_cifti_files = pe.MapNode( + ds_cifti_ts = pe.MapNode( DerivativesDataSink( check_hdr=False, dismiss_entities=["desc", "den"], @@ -407,21 +434,17 @@ def init_concatenate_data_wf(TR, head_radius, name="concatenate_data_wf"): suffix="timeseries", extension=".ptseries.nii", ), - name="ds_timeseries_cifti_files", + name="ds_cifti_ts", run_without_submitting=True, mem_gb=1, iterfield=["segmentation", "in_file", "meta_dict"], ) - ds_timeseries_cifti_files.inputs.segmentation = atlases + ds_cifti_ts.inputs.segmentation = atlases workflow.connect([ - (clean_name_source, ds_timeseries_cifti_files, [("name_source", "source_file")]), - (concatenate_inputs, ds_timeseries_cifti_files, [ - ("timeseries_ciftis", "in_file"), - ]), - (make_timeseries_ciftis_dict, ds_timeseries_cifti_files, [ - ("metadata", "meta_dict"), - ]), + (clean_name_source, ds_cifti_ts, [("name_source", "source_file")]), + (concatenate_inputs, ds_cifti_ts, [("timeseries_ciftis", "in_file")]), + (cifti_ts_src, ds_cifti_ts, [("metadata", "meta_dict")]), ]) # fmt:skip return workflow diff --git a/xcp_d/workflows/bold/outputs.py b/xcp_d/workflows/bold/outputs.py index 27505e734..d30dbf4b0 100644 --- a/xcp_d/workflows/bold/outputs.py +++ b/xcp_d/workflows/bold/outputs.py @@ -7,15 +7,9 @@ from niworkflows.engine.workflows import LiterateWorkflow as Workflow from xcp_d import config -from xcp_d.interfaces.bids import DerivativesDataSink -from xcp_d.utils.bids import ( - _make_atlas_uri, - _make_preproc_uri, - _make_xcpd_uri, - get_entity, -) +from xcp_d.interfaces.bids import BIDSURI, DerivativesDataSink +from xcp_d.utils.bids import get_entity from xcp_d.utils.doc import fill_doc -from xcp_d.utils.utils import _make_dictionary @fill_doc @@ -83,7 +77,6 @@ def init_postproc_derivatives_wf( """ workflow = Workflow(name=name) - fmri_dir = config.execution.fmri_dir bandpass_filter = config.workflow.bandpass_filter low_pass = config.workflow.low_pass high_pass = config.workflow.high_pass @@ -148,6 +141,25 @@ def init_postproc_derivatives_wf( name="outputnode", ) + bold_sources = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="sources", + ) + bold_sources.inputs.in1 = name_source + confound_sources = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="confounds", + ) + workflow.connect([(inputnode, confound_sources, [("preproc_confounds_file", "in1")])]) + # Create dictionary of basic information cleaned_data_dictionary = { **source_metadata, @@ -175,8 +187,6 @@ def init_postproc_derivatives_wf( # Determine cohort (if there is one) in the original data cohort = get_entity(name_source, "cohort") - preproc_bold_src = _make_preproc_uri(name_source, fmri_dir) - ds_motion = pe.Node( DerivativesDataSink( source_file=name_source, @@ -192,26 +202,38 @@ def init_postproc_derivatives_wf( (inputnode, ds_motion, [ ("motion_metadata", "meta_dict"), ("motion_file", "in_file"), - (("preproc_confounds_file", _make_preproc_uri, fmri_dir), "Sources"), ]), + (confound_sources, ds_motion, [("out", "Sources")]), (ds_motion, outputnode, [("out_file", "motion_file")]), ]) # fmt:skip merge_dense_src = pe.Node( - niu.Merge( + BIDSURI( numinputs=( 1 + (1 if fd_thresh > 0 else 0) + (1 if config.execution.confounds_config != "none" else 0) ), + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), name="merge_dense_src", run_without_submitting=True, mem_gb=1, ) - merge_dense_src.inputs.in1 = preproc_bold_src + workflow.connect([(bold_sources, merge_dense_src, [("out", "in1")])]) if fd_thresh > 0: + motion_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="motion_src", + ) + workflow.connect([(ds_motion, motion_src, [("out_file", "in1")])]) + ds_temporal_mask = pe.Node( DerivativesDataSink( dismiss_entities=["segmentation", "den", "res", "space", "cohort", "desc"], @@ -225,34 +247,33 @@ def init_postproc_derivatives_wf( run_without_submitting=True, mem_gb=1, ) + workflow.connect([ (inputnode, ds_temporal_mask, [ ("temporal_mask_metadata", "meta_dict"), ("temporal_mask", "in_file"), ]), - (ds_motion, ds_temporal_mask, [(("out_file", _make_xcpd_uri, output_dir), "Sources")]), + (motion_src, ds_temporal_mask, [("out", "Sources")]), (ds_temporal_mask, outputnode, [("out_file", "temporal_mask")]), - (ds_temporal_mask, merge_dense_src, [ - (("out_file", _make_xcpd_uri, output_dir), "in2"), - ]), + (ds_temporal_mask, merge_dense_src, [("out_file", "in2")]), ]) # fmt:skip if config.execution.confounds_config is not None: # XXX: I need to combine collected confounds files as Sources here. # Not just the preproc_confounds_file. confounds_src = pe.Node( - niu.Merge(numinputs=(1 + (1 if fd_thresh > 0 else 0))), + BIDSURI( + numinputs=0 + (1 if fd_thresh > 0 else 0), + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), name="confounds_src", run_without_submitting=True, mem_gb=1, ) - workflow.connect([(inputnode, confounds_src, [("preproc_confounds_file", "in1")])]) + workflow.connect([(inputnode, confounds_src, [("confounds_metadata", "metadata")])]) if fd_thresh > 0: - workflow.connect([ - (ds_temporal_mask, confounds_src, [ - (("out_file", _make_xcpd_uri, output_dir), "in2"), - ]), - ]) # fmt:skip + workflow.connect([(ds_temporal_mask, confounds_src, [("out_file", "in2")])]) ds_confounds = pe.Node( DerivativesDataSink( @@ -266,14 +287,9 @@ def init_postproc_derivatives_wf( run_without_submitting=False, ) workflow.connect([ - (inputnode, ds_confounds, [ - ("confounds_tsv", "in_file"), - ("confounds_metadata", "meta_dict"), - ]), - (confounds_src, ds_confounds, [("out", "Sources")]), - (ds_confounds, merge_dense_src, [ - (("out_file", _make_xcpd_uri, output_dir), f"in{3 if fd_thresh > 0 else 2}"), - ]), + (inputnode, ds_confounds, [("confounds_tsv", "in_file")]), + (confounds_src, ds_confounds, [("metadata", "meta_dict")]), + (ds_confounds, merge_dense_src, [("out_file", f"in{3 if fd_thresh > 0 else 2}")]), ]) # fmt:skip # Write out derivatives via DerivativesDataSink @@ -317,6 +333,18 @@ def init_postproc_derivatives_wf( workflow.connect([(inputnode, ds_qc_file, [("qc_file", "in_file")])]) if smoothing: + smoothed_bold_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="smoothed_bold_src", + run_without_submitting=True, + mem_gb=1, + ) + workflow.connect([(ds_denoised_bold, smoothed_bold_src, [("out_file", "in1")])]) + # Write out derivatives via DerivativesDataSink ds_smoothed_bold = pe.Node( DerivativesDataSink( @@ -337,37 +365,31 @@ def init_postproc_derivatives_wf( ) workflow.connect([ (inputnode, ds_smoothed_bold, [("smoothed_denoised_bold", "in_file")]), - (ds_denoised_bold, ds_smoothed_bold, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (smoothed_bold_src, ds_smoothed_bold, [("out", "Sources")]), (ds_smoothed_bold, outputnode, [("out_file", "smoothed_denoised_bold")]), ]) # fmt:skip # Connectivity workflow outputs if config.execution.atlases: make_atlas_dict = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, name="make_atlas_dict", - iterfield=["Sources"], + iterfield=["in1"], ) - workflow.connect([ - (inputnode, make_atlas_dict, [ - (("atlas_files", _make_atlas_uri, output_dir), "Sources"), - ]), - ]) # fmt:skip + workflow.connect([(inputnode, make_atlas_dict, [("atlas_files", "in1")])]) # Convert Sources to a dictionary, to play well with parcellation MapNodes. add_denoised_to_src = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["metadata", "Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, @@ -376,9 +398,7 @@ def init_postproc_derivatives_wf( ) workflow.connect([ (make_atlas_dict, add_denoised_to_src, [("metadata", "metadata")]), - (ds_denoised_bold, add_denoised_to_src, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (ds_denoised_bold, add_denoised_to_src, [("out_file", "in1")]), ]) # fmt:skip # TODO: Add brain mask to Sources (for NIfTIs). @@ -405,21 +425,19 @@ def init_postproc_derivatives_wf( ]) # fmt:skip add_coverage_to_src = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["metadata", "Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, name="add_coverage_to_src", - iterfield=["metadata", "Sources"], + iterfield=["metadata", "in1"], ) workflow.connect([ (add_denoised_to_src, add_coverage_to_src, [("metadata", "metadata")]), - (ds_coverage, add_coverage_to_src, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (ds_coverage, add_coverage_to_src, [("out_file", "in1")]), ]) # fmt:skip ds_timeseries = pe.MapNode( @@ -448,24 +466,34 @@ def init_postproc_derivatives_wf( ]) # fmt:skip if config.workflow.output_correlations: - make_corrs_meta_dict = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["Sources", "NodeFiles"], - output_names=["metadata"], + make_corrs_meta_dict1 = pe.MapNode( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, - name="make_corrs_meta_dict", - iterfield=["Sources", "NodeFiles"], + name="make_corrs_meta_dict1", + iterfield=["in1"], + ) + workflow.connect([(ds_timeseries, make_corrs_meta_dict1, [("out_file", "in1")])]) + + make_corrs_meta_dict2 = pe.MapNode( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + field="NodeFiles", + ), + run_without_submitting=True, + mem_gb=1, + name="make_corrs_meta_dict2", + iterfield=["in1", "metadata"], ) workflow.connect([ - (inputnode, make_corrs_meta_dict, [ - (("atlas_files", _make_atlas_uri, output_dir), "NodeFiles"), - ]), - (ds_timeseries, make_corrs_meta_dict, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (inputnode, make_corrs_meta_dict2, [("atlas_files", "in1")]), + (make_corrs_meta_dict1, make_corrs_meta_dict2, [("metadata", "metadata")]), ]) # fmt:skip ds_correlations = pe.MapNode( @@ -493,7 +521,7 @@ def init_postproc_derivatives_wf( ("atlas_names", "segmentation"), ("correlations", "in_file"), ]), - (make_corrs_meta_dict, ds_correlations, [("metadata", "meta_dict")]), + (make_corrs_meta_dict2, ds_correlations, [("metadata", "meta_dict")]), ]) # fmt:skip if file_format == "cifti": @@ -521,21 +549,19 @@ def init_postproc_derivatives_wf( ]) # fmt:skip add_ccoverage_to_src = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["metadata", "Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, name="add_ccoverage_to_src", - iterfield=["metadata", "Sources"], + iterfield=["metadata", "in1"], ) workflow.connect([ (add_denoised_to_src, add_ccoverage_to_src, [("metadata", "metadata")]), - (ds_coverage_ciftis, add_ccoverage_to_src, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (ds_coverage_ciftis, add_ccoverage_to_src, [("out_file", "in1")]), ]) # fmt:skip ds_timeseries_ciftis = pe.MapNode( @@ -564,24 +590,36 @@ def init_postproc_derivatives_wf( ]) # fmt:skip if config.workflow.output_correlations: - make_ccorrs_meta_dict = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["Sources", "NodeFiles"], - output_names=["metadata"], + make_ccorrs_meta_dict1 = pe.MapNode( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, - name="make_ccorrs_meta_dict", - iterfield=["Sources", "NodeFiles"], + name="make_ccorrs_meta_dict1", + iterfield=["in1"], ) workflow.connect([ - (inputnode, make_ccorrs_meta_dict, [ - (("atlas_files", _make_atlas_uri, output_dir), "NodeFiles"), - ]), - (ds_timeseries_ciftis, make_ccorrs_meta_dict, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (ds_timeseries_ciftis, make_ccorrs_meta_dict1, [("out_file", "in1")]), + ]) # fmt:skip + + make_ccorrs_meta_dict2 = pe.MapNode( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + field="NodeFiles", + ), + run_without_submitting=True, + mem_gb=1, + name="make_ccorrs_meta_dict2", + iterfield=["in1", "metadata"], + ) + workflow.connect([ + (inputnode, make_ccorrs_meta_dict2, [("atlas_files", "in1")]), + (make_ccorrs_meta_dict1, make_ccorrs_meta_dict2, [("metadata", "metadata")]), ]) # fmt:skip ds_correlation_ciftis = pe.MapNode( @@ -611,7 +649,7 @@ def init_postproc_derivatives_wf( ("atlas_names", "segmentation"), ("correlation_ciftis", "in_file"), ]), - (make_ccorrs_meta_dict, ds_correlation_ciftis, [("metadata", "meta_dict")]), + (make_ccorrs_meta_dict2, ds_correlation_ciftis, [("metadata", "meta_dict")]), ]) # fmt:skip for i_exact_scan, exact_scan in enumerate(exact_scans): @@ -645,6 +683,18 @@ def init_postproc_derivatives_wf( ]) # fmt:skip # Resting state metric outputs + denoised_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="denoised_src", + run_without_submitting=True, + mem_gb=1, + ) + workflow.connect([(ds_denoised_bold, denoised_src, [("out_file", "in1")])]) + ds_reho = pe.Node( DerivativesDataSink( source_file=name_source, @@ -665,15 +715,15 @@ def init_postproc_derivatives_wf( ) workflow.connect([ (inputnode, ds_reho, [("reho", "in_file")]), - (ds_denoised_bold, ds_reho, [(("out_file", _make_xcpd_uri, output_dir), "Sources")]), + (denoised_src, ds_reho, [("out", "Sources")]), ]) # fmt:skip if config.execution.atlases: add_reho_to_src = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["metadata", "Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, @@ -682,7 +732,7 @@ def init_postproc_derivatives_wf( ) workflow.connect([ (make_atlas_dict, add_reho_to_src, [("metadata", "metadata")]), - (ds_reho, add_reho_to_src, [(("out_file", _make_xcpd_uri, output_dir), "Sources")]), + (ds_reho, add_reho_to_src, [("out_file", "in1")]), ]) # fmt:skip ds_parcellated_reho = pe.MapNode( @@ -730,10 +780,22 @@ def init_postproc_derivatives_wf( ) workflow.connect([ (inputnode, ds_alff, [("alff", "in_file")]), - (ds_denoised_bold, ds_alff, [(("out_file", _make_xcpd_uri, output_dir), "Sources")]), + (denoised_src, ds_alff, [("out", "Sources")]), ]) # fmt:skip if smoothing: + alff_src = pe.Node( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="alff_src", + run_without_submitting=True, + mem_gb=1, + ) + workflow.connect([(ds_alff, alff_src, [("out_file", "in1")])]) + ds_smoothed_alff = pe.Node( DerivativesDataSink( source_file=name_source, @@ -755,17 +817,15 @@ def init_postproc_derivatives_wf( ) workflow.connect([ (inputnode, ds_smoothed_alff, [("smoothed_alff", "in_file")]), - (ds_alff, ds_smoothed_alff, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (alff_src, ds_smoothed_alff, [("out", "Sources")]), ]) # fmt:skip if config.execution.atlases: add_alff_to_src = pe.MapNode( - niu.Function( - function=_make_dictionary, - input_names=["metadata", "Sources"], - output_names=["metadata"], + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), ), run_without_submitting=True, mem_gb=1, @@ -774,9 +834,7 @@ def init_postproc_derivatives_wf( ) workflow.connect([ (make_atlas_dict, add_alff_to_src, [("metadata", "metadata")]), - (ds_alff, add_alff_to_src, [ - (("out_file", _make_xcpd_uri, output_dir), "Sources"), - ]), + (ds_alff, add_alff_to_src, [("out_file", "in1")]), ]) # fmt:skip ds_parcellated_alff = pe.MapNode( diff --git a/xcp_d/workflows/parcellation.py b/xcp_d/workflows/parcellation.py index 3e6604358..6eedce96e 100644 --- a/xcp_d/workflows/parcellation.py +++ b/xcp_d/workflows/parcellation.py @@ -9,6 +9,7 @@ from xcp_d import config from xcp_d.interfaces.ants import ApplyTransforms +from xcp_d.interfaces.bids import BIDSURI from xcp_d.interfaces.nilearn import IndexImage from xcp_d.utils.doc import fill_doc from xcp_d.utils.utils import get_std2bold_xfms @@ -171,10 +172,22 @@ def init_load_atlases_wf(name="load_atlases_wf"): else: workflow.connect([(inputnode, atlas_buffer, [("atlas_files", "atlas_file")])]) + atlas_srcs = pe.MapNode( + BIDSURI( + numinputs=1, + dataset_links=config.execution.dataset_links, + out_dir=str(output_dir), + ), + name="atlas_srcs", + iterfield=["in1"], + run_without_submitting=True, + ) + workflow.connect([(inputnode, atlas_srcs, [("atlas_files", "in1")])]) + copy_atlas = pe.MapNode( CopyAtlas(output_dir=output_dir), name="copy_atlas", - iterfield=["in_file", "atlas", "meta_dict"], + iterfield=["in_file", "atlas", "meta_dict", "Sources"], run_without_submitting=True, ) workflow.connect([ @@ -184,6 +197,7 @@ def init_load_atlases_wf(name="load_atlases_wf"): ("atlas_metadata", "meta_dict"), ]), (atlas_buffer, copy_atlas, [("atlas_file", "in_file")]), + (atlas_srcs, copy_atlas, [("out", "Sources")]), (copy_atlas, outputnode, [("out_file", "atlas_files")]), ]) # fmt:skip