From fd56bf4abe195da9d351d64345381231ce7f7038 Mon Sep 17 00:00:00 2001 From: Brendan Moloney Date: Mon, 12 Aug 2024 15:08:26 -0700 Subject: [PATCH 1/2] BF+ENH: Fixes to DICOM scaling, make frame filtering explicit Fixes how we handle DICOM scaling, particularly for Philips and multi-frame files. For Philips data scale factors without defined units should be avoided, and instead a private tag should be used to make image intensities comparable across series. For multi-frame DICOM, it is possible to have different scale factors (potentially coming from different tags) per-frame. We also prefer scale factors from a RealWorldValueMapping provided they have defined units. The base Wrapper class now has a few new attributes and methods to support this functionality. In particular an attribute `scale_factors` that provides an array of slope/intercept pairs, and a method `get_unscaled_data` that will return the reordered/reshaped data but without the scaling applied. A `vendor` attribute was also added to better support vendor-specific implementation details. For the MultiFrameWrapper I also added an attribute `frame_order` which exposes the order used to sort the frames, and use this to return the `scale_factors` in sorted order. While implementing this I kept bumping into issues due to the (implicit) frame filtering that was happening in the `image_shape` property, so I made this filtering explicit and configurable and moved it into the class initialization. --- nibabel/nicom/dicomwrappers.py | 410 +++++++++++++++++----- nibabel/nicom/tests/test_dicomwrappers.py | 363 ++++++++++++++----- nibabel/nicom/utils.py | 54 +++ 3 files changed, 636 insertions(+), 191 deletions(-) diff --git a/nibabel/nicom/dicomwrappers.py b/nibabel/nicom/dicomwrappers.py index 374387870..3842248fd 100755 --- a/nibabel/nicom/dicomwrappers.py +++ b/nibabel/nicom/dicomwrappers.py @@ -13,6 +13,7 @@ """ import operator +import re import warnings from functools import cached_property @@ -23,6 +24,7 @@ from ..openers import ImageOpener from . import csareader as csar from .dwiparams import B2q, nearest_pos_semi_def, q2bg +from .utils import Vendor, find_private_section, vendor_from_private pydicom = optional_package('pydicom')[0] @@ -59,7 +61,7 @@ def wrapper_from_file(file_like, *args, **kwargs): return wrapper_from_data(dcm_data) -def wrapper_from_data(dcm_data): +def wrapper_from_data(dcm_data, frame_filters=None): """Create DICOM wrapper from DICOM data object Parameters @@ -68,6 +70,9 @@ def wrapper_from_data(dcm_data): Object allowing attribute access, with DICOM attributes. Probably a dataset as read by ``pydicom``. + frame_filters + Optionally override the `frame_filters` used to create a `MultiFrameWrapper` + Returns ------- dcm_w : ``dicomwrappers.Wrapper`` or subclass @@ -76,9 +81,8 @@ def wrapper_from_data(dcm_data): sop_class = dcm_data.get('SOPClassUID') # try to detect what type of dicom object to wrap if sop_class == '1.2.840.10008.5.1.4.1.1.4.1': # Enhanced MR Image Storage - # currently only Philips is using Enhanced Multiframe DICOM - return MultiframeWrapper(dcm_data) - # Check for Siemens DICOM format types + return MultiframeWrapper(dcm_data, frame_filters) + # Check for non-enhanced (legacy) Siemens DICOM format types # Only Siemens will have data for the CSA header try: csa = csar.get_csa_header(dcm_data) @@ -103,6 +107,7 @@ class Wrapper: Methods: * get_data() + * get_unscaled_data() * get_pixel_array() * is_same_series(other) * __getitem__ : return attributes from `dcm_data` @@ -120,6 +125,8 @@ class Wrapper: * image_position : sequence length 3 * slice_indicator : float * series_signature : tuple + * scale_factors : (N, 2) array + * vendor : Vendor """ is_csa = False @@ -136,10 +143,34 @@ def __init__(self, dcm_data): dcm_data : object object should allow 'get' and '__getitem__' access. Usually this will be a ``dicom.dataset.Dataset`` object resulting from reading a - DICOM file, but a dictionary should also work. + DICOM file. """ self.dcm_data = dcm_data + @cached_property + def vendor(self): + """The vendor of the instrument that produced the DICOM""" + # Look at manufacturer tag first + mfgr = self.get('Manufacturer') + if mfgr: + if re.search('Siemens', mfgr, re.IGNORECASE): + return Vendor.SIEMENS + if re.search('Philips', mfgr, re.IGNORECASE): + return Vendor.PHILIPS + if re.search('GE Medical', mfgr, re.IGNORECASE): + return Vendor.GE + # Next look at UID prefixes + for uid_src in ('StudyInstanceUID', 'SeriesInstanceUID', 'SOPInstanceUID'): + uid = str(self.get(uid_src)) + if uid.startswith(('1.3.12.2.1007.', '1.3.12.2.1107.')): + return Vendor.SIEMENS + if uid.startswith(('1.3.46', '1.3.12.2.1017')): + return Vendor.PHILIPS + if uid.startswith('1.2.840.113619'): + return Vendor.GE + # Finally look for vendor specific private blocks + return vendor_from_private(self.dcm_data) + @cached_property def image_shape(self): """The array shape as it will be returned by ``get_data()``""" @@ -315,14 +346,30 @@ def affine(self): return aff def get_pixel_array(self): - """Return unscaled pixel array from DICOM""" + """Return raw pixel array without reshaping or scaling + + Returns + ------- + data : array + array with raw pixel data from DICOM + """ data = self.dcm_data.get('pixel_array') if data is None: raise WrapperError('Cannot find data in DICOM') return data + def get_unscaled_data(self): + """Return pixel array that is potentially reshaped, but without any scaling + + Returns + ------- + data : array + array with raw pixel data from DICOM + """ + return self.get_pixel_array() + def get_data(self): - """Get scaled image data from DICOMs + """Get potentially scaled and reshaped image data from DICOMs We return the data as DICOM understands it, first dimension is rows, second dimension is columns @@ -333,7 +380,7 @@ def get_data(self): array with data as scaled from any scaling in the DICOM fields. """ - return self._scale_data(self.get_pixel_array()) + return self._scale_data(self.get_unscaled_data()) def is_same_series(self, other): """Return True if `other` appears to be in same series @@ -372,11 +419,86 @@ def is_same_series(self, other): return False return True + @cached_property + def scale_factors(self): + """Return (2, N) array of slope/intercept pairs""" + scaling = self._get_best_scale_factor(self.dcm_data) + if scaling is None: + if self.vendor == Vendor.PHILIPS: + warnings.warn( + 'Unable to find Philips private scale factor, cross-series comparisons may be invalid' + ) + scaling = (1, 0) + return np.array((scaling,)) + + def _get_rwv_scale_factor(self, dcm_data): + """Return the first set of 'real world' scale factors with defined units""" + rw_seq = dcm_data.get('RealWorldValueMappingSequence') + if rw_seq: + for rw_map in rw_seq: + try: + units = rw_map.MeasurementUnitsCodeSequence[0].CodeMeaning + except (AttributeError, IndexError): + continue + if units not in ('', 'no units', 'UNDEFINED'): + return ( + rw_map.get('RealWorldValueSlope', 1), + rw_map.get('RealWorldValueIntercept', 0), + ) + + def _get_legacy_scale_factor(self, dcm_data): + """Return scale factors from older 'Modality LUT' macro + + For Philips data we require RescaleType is defined and not set to 'normalized' + """ + pix_trans_seq = dcm_data.get('PixelValueTransformationSequence') + if pix_trans_seq is not None: + pix_trans = pix_trans_seq[0] + if self.vendor != Vendor.PHILIPS or pix_trans.get('RescaleType', 'US') not in ( + '', + 'US', + 'normalized', + ): + return (pix_trans.get('RescaleSlope', 1), pix_trans.get('RescaleIntercept', 0)) + if ( + dcm_data.get('RescaleSlope') is not None + or dcm_data.get('RescaleIntercept') is not None + ): + if self.vendor != Vendor.PHILIPS or dcm_data.get('RescaleType', 'US') not in ( + '', + 'US', + 'normalized', + ): + return (dcm_data.get('RescaleSlope', 1), dcm_data.get('RescaleIntercept', 0)) + + def _get_philips_scale_factor(self, dcm_data): + """Return scale factors from Philips private element + + If we don't have any other scale factors that are tied to real world units, then + this is the best scaling to use to enable cross-series comparisons + """ + offset = find_private_section(dcm_data, 0x2005, 'Philips MR Imaging DD 001') + priv_scale = None if offset is None else dcm_data.get((0x2005, offset + 0xE)) + if priv_scale is not None: + return (priv_scale.value, 0.0) + + def _get_best_scale_factor(self, dcm_data): + """Return the most appropriate scale factor found or None""" + scaling = self._get_rwv_scale_factor(dcm_data) + if scaling is not None: + return scaling + scaling = self._get_legacy_scale_factor(dcm_data) + if scaling is not None: + return scaling + if self.vendor == Vendor.PHILIPS: + scaling = self._get_philips_scale_factor(dcm_data) + if scaling is not None: + return scaling + def _scale_data(self, data): # depending on pydicom and dicom files, values might need casting from # Decimal to float - scale = float(self.get('RescaleSlope', 1)) - offset = float(self.get('RescaleIntercept', 0)) + scale, offset = self.scale_factors[0] return self._apply_scale_offset(data, scale, offset) def _apply_scale_offset(self, data, scale, offset): @@ -407,6 +529,71 @@ def b_vector(self): return q2bg(q_vec)[1] +class FrameFilter: + """Base class for defining how to filter out (ignore) frames from a multiframe file + + It is guaranteed that the `applies` method will on a dataset before the `keep` method + is called on any of the frames inside. + """ + + def applies(self, dcm_wrp) -> bool: + """Returns true if the filter should be applied to a dataset""" + return True + + def keep(self, frame_data) -> bool: + """Return true if the frame should be kept""" + raise NotImplementedError + + +class FilterMultiStack(FrameFilter): + """Filter out all but one `StackID`""" + + def __init__(self, keep_id=None): + self._keep_id = keep_id + + def applies(self, dcm_wrp) -> bool: + first_fcs = dcm_wrp.frames[0].get('FrameContentSequence', (None,))[0] + if first_fcs is None or not hasattr(first_fcs, 'StackID'): + return False + stack_ids = {frame.FrameContentSequence[0].StackID for frame in dcm_wrp.frames} + if self._keep_id is not None: + if self._keep_id not in stack_ids: + raise WrapperError('Explicitly requested StackID not found') + self._selected = self._keep_id + if len(stack_ids) > 1: + if self._keep_id is None: + warnings.warn( + 'A multi-stack file was passed without an explicit filter, just using lowest StackID' + ) + self._selected = sorted(stack_ids)[0] + return True + return False + + def keep(self, frame) -> bool: + return frame.FrameContentSequence[0].StackID == self._selected + + +class FilterDwiIso(FrameFilter): + """Filter out derived ISOTROPIC frames from DWI series""" + + def applies(self, dcm_wrp) -> bool: + if not hasattr(dcm_wrp.frames[0], 'MRDiffusionSequence'): + return False + diff_dirs = { + f.MRDiffusionSequence[0].get('DiffusionDirectionality') for f in dcm_wrp.frames + } + if len(diff_dirs) > 1 and 'ISOTROPIC' in diff_dirs: + warnings.warn('Derived images found and removed') + return True + return False + + def keep(self, frame) -> bool: + return frame.MRDiffusionSequence[0].DiffusionDirectionality != 'ISOTROPIC' + + +DEFUALT_FRAME_FILTERS = (FilterMultiStack(), FilterDwiIso()) + + class MultiframeWrapper(Wrapper): """Wrapper for Enhanced MR Storage SOP Class @@ -436,17 +623,20 @@ class MultiframeWrapper(Wrapper): Methods ------- + vendor(self) + frame_order(self) image_shape(self) image_orient_patient(self) voxel_sizes(self) image_position(self) series_signature(self) + scale_factors(self) get_data(self) """ is_multiframe = True - def __init__(self, dcm_data): + def __init__(self, dcm_data, frame_filters=None): """Initializes MultiframeWrapper Parameters @@ -454,10 +644,13 @@ def __init__(self, dcm_data): dcm_data : object object should allow 'get' and '__getitem__' access. Usually this will be a ``dicom.dataset.Dataset`` object resulting from reading a - DICOM file, but a dictionary should also work. + DICOM file. + + frame_filters : Iterable of FrameFilter + defines which frames inside the dataset should be ignored. If None then + `dicomwrappers.DEFAULT_FRAME_FILTERS` will be used. """ Wrapper.__init__(self, dcm_data) - self.dcm_data = dcm_data self.frames = dcm_data.get('PerFrameFunctionalGroupsSequence') try: self.frames[0] @@ -467,8 +660,19 @@ def __init__(self, dcm_data): self.shared = dcm_data.get('SharedFunctionalGroupsSequence')[0] except TypeError: raise WrapperError('SharedFunctionalGroupsSequence is empty.') + # Apply frame filters one at a time in the order provided + if frame_filters is None: + frame_filters = DEFUALT_FRAME_FILTERS + frame_filters = [filt for filt in frame_filters if filt.applies(self)] + for filt in frame_filters: + self.frames = [f for f in self.frames if filt.keep(f)] + # Make sure there is only one StackID remaining + first_fcs = self.frames[0].get('FrameContentSequence', (None,))[0] + if first_fcs is not None and hasattr(first_fcs, 'StackID'): + if len({frame.FrameContentSequence[0].StackID for frame in self.frames}) > 1: + raise WrapperError('More than one StackID remains after filtering') # Try to determine slice order and minimal image position patient - self._frame_slc_ord = self._ipp = None + self._frame_slc_ord = self._ipp = self._slice_spacing = None try: frame_ipps = [f.PlanePositionSequence[0].ImagePositionPatient for f in self.frames] except AttributeError: @@ -485,8 +689,29 @@ def __init__(self, dcm_data): val: order for val, order in zip(uniq_slc_pos, np.argsort(uniq_slc_pos)) } self._frame_slc_ord = [pos_ord_map[pos] for pos in rnd_slc_pos] + if len(self._frame_slc_ord) > 1: + self._slice_spacing = ( + frame_slc_pos[self._frame_slc_ord[1]] - frame_slc_pos[self._frame_slc_ord[0]] + ) self._ipp = frame_ipps[np.argmin(frame_slc_pos)] - self._shape = None + self._frame_indices = None + + @cached_property + def vendor(self): + """The vendor of the instrument that produced the DICOM""" + vendor = super().vendor + if vendor is not None: + return vendor + vendor = vendor_from_private(self.shared) + if vendor is not None: + return vendor + return vendor_from_private(self.frames[0]) + + @cached_property + def frame_order(self): + if self._frame_indices is None: + _ = self.image_shape + return np.lexsort(self._frame_indices.T) @cached_property def image_shape(self): @@ -519,68 +744,20 @@ def image_shape(self): rows, cols = self.get('Rows'), self.get('Columns') if None in (rows, cols): raise WrapperError('Rows and/or Columns are empty.') - - # Check number of frames - first_frame = self.frames[0] - n_frames = self.get('NumberOfFrames') - # some Philips may have derived images appended - has_derived = False - if hasattr(first_frame, 'get') and first_frame.get([0x18, 0x9117]): - # DWI image may include derived isotropic, ADC or trace volume - try: - aniso_frames = pydicom.Sequence() - aniso_slc_ord = [] - for slc_ord, frame in zip(self._frame_slc_ord, self.frames): - if frame.MRDiffusionSequence[0].DiffusionDirectionality != 'ISOTROPIC': - aniso_frames.append(frame) - aniso_slc_ord.append(slc_ord) - # Image contains DWI volumes followed by derived images; remove derived images - if len(aniso_frames) != 0: - self.frames = aniso_frames - self._frame_slc_ord = aniso_slc_ord - except IndexError: - # Sequence tag is found but missing items! - raise WrapperError('Diffusion file missing information') - except AttributeError: - # DiffusionDirectionality tag is not required - pass - else: - if n_frames != len(self.frames): - warnings.warn('Derived images found and removed') - n_frames = len(self.frames) - has_derived = True - - assert len(self.frames) == n_frames - frame_indices = np.array( - [frame.FrameContentSequence[0].DimensionIndexValues for frame in self.frames] - ) - # Check that there is only one multiframe stack index - stack_ids = {frame.FrameContentSequence[0].StackID for frame in self.frames} - if len(stack_ids) > 1: - raise WrapperError( - 'File contains more than one StackID. Cannot handle multi-stack files' + # Check number of frames, initialize array of frame indices + n_frames = len(self.frames) + try: + frame_indices = np.array( + [frame.FrameContentSequence[0].DimensionIndexValues for frame in self.frames] ) - # Determine if one of the dimension indices refers to the stack id - dim_seq = [dim.DimensionIndexPointer for dim in self.get('DimensionIndexSequence')] - stackid_tag = pydicom.datadict.tag_for_keyword('StackID') - # remove the stack id axis if present - if stackid_tag in dim_seq: - stackid_dim_idx = dim_seq.index(stackid_tag) - frame_indices = np.delete(frame_indices, stackid_dim_idx, axis=1) - dim_seq.pop(stackid_dim_idx) - if has_derived: - # derived volume is included - derived_tag = pydicom.datadict.tag_for_keyword('DiffusionBValue') - if derived_tag not in dim_seq: - raise WrapperError('Missing information, cannot remove indices with confidence.') - derived_dim_idx = dim_seq.index(derived_tag) - frame_indices = np.delete(frame_indices, derived_dim_idx, axis=1) - dim_seq.pop(derived_dim_idx) + except AttributeError: + raise WrapperError("Can't find frame 'DimensionIndexValues'") # Determine the shape and which indices to use shape = [rows, cols] curr_parts = n_frames frames_per_part = 1 del_indices = {} + dim_seq = [dim.DimensionIndexPointer for dim in self.get('DimensionIndexSequence')] stackpos_tag = pydicom.datadict.tag_for_keyword('InStackPositionNumber') slice_dim_idx = dim_seq.index(stackpos_tag) for row_idx, row in enumerate(frame_indices.T): @@ -684,12 +861,15 @@ def voxel_sizes(self): except AttributeError: raise WrapperError('Not enough data for pixel spacing') pix_space = pix_measures.PixelSpacing - try: - zs = pix_measures.SliceThickness - except AttributeError: - zs = self.get('SpacingBetweenSlices') - if zs is None: - raise WrapperError('Not enough data for slice thickness') + if self._slice_spacing is not None: + zs = self._slice_spacing + else: + try: + zs = pix_measures.SliceThickness + except AttributeError: + zs = self.get('SpacingBetweenSlices') + if zs is None: + raise WrapperError('Not enough data for slice thickness') # Ensure values are float rather than Decimal return tuple(map(float, list(pix_space) + [zs])) @@ -710,27 +890,63 @@ def series_signature(self): signature['vox'] = (self.voxel_sizes, none_or_close) return signature - def get_data(self): + @cached_property + def scale_factors(self): + """Return `(2, N)` array of slope/intercept pairs + + If there is a single global scale factor then `N` will be one, otherwise it will + be the number of frames + """ + # Look for shared / global RWV scale factor first + shared_scale = self._get_rwv_scale_factor(self.shared) + if shared_scale is not None: + return np.array([shared_scale]) + shared_scale = self._get_rwv_scale_factor(self.dcm_data) + if shared_scale is not None: + return np.array([shared_scale]) + # Try pulling out best scale factors from each individual frame + frame_scales = [self._get_best_scale_factor(f) for f in self.frames] + if any(s is not None for s in frame_scales): + if any(s is None for s in frame_scales): + if self.vendor == Vendor.PHILIPS: + warnings.warn( + 'Unable to find Philips private scale factor, cross-series comparisons may be invalid' + ) + frame_scales = [s if s is not None else (1, 0) for s in frame_scales] + if all(s == frame_scales[0] for s in frame_scales[1:]): + return np.array([frame_scales[0]]) + return np.array(frame_scales)[self.frame_order] + # Finally look for shared non-RWV scale factors + shared_scale = self._get_best_scale_factor(self.shared) + if shared_scale is not None: + return np.array([shared_scale]) + shared_scale = self._get_best_scale_factor(self.dcm_data) + if shared_scale is None: + if self.vendor == Vendor.PHILIPS: + warnings.warn( + 'Unable to find Philips private scale factor, cross-series comparisons may be invalid' + ) + shared_scale = (1, 0) + return np.array([shared_scale]) + + def get_unscaled_data(self): shape = self.image_shape if shape is None: raise WrapperError('No valid information for image shape') data = self.get_pixel_array() - # Roll frames axis to last + # Roll frames axis to last and reorder if len(data.shape) > 2: - data = data.transpose((1, 2, 0)) - # Sort frames with first index changing fastest, last slowest - sorted_indices = np.lexsort(self._frame_indices.T) - data = data[..., sorted_indices] - data = data.reshape(shape, order='F') - return self._scale_data(data) + data = data.transpose((1, 2, 0))[..., self.frame_order] + return data.reshape(shape, order='F') def _scale_data(self, data): - pix_trans = getattr(self.frames[0], 'PixelValueTransformationSequence', None) - if pix_trans is None: - return super()._scale_data(data) - scale = float(pix_trans[0].RescaleSlope) - offset = float(pix_trans[0].RescaleIntercept) - return self._apply_scale_offset(data, scale, offset) + scale_factors = self.scale_factors + if scale_factors.shape[0] == 1: + scale, offset = scale_factors[0] + return self._apply_scale_offset(data, scale, offset) + orig_shape = data.shape + data = data.reshape(data.shape[:2] + (len(self.frames),)) + return (data * scale_factors[:, 0] + scale_factors[:, 1]).reshape(orig_shape) class SiemensWrapper(Wrapper): @@ -757,7 +973,7 @@ def __init__(self, dcm_data, csa_header=None): object should allow 'get' and '__getitem__' access. If `csa_header` is None, it should also be possible to extract a CSA header from `dcm_data`. Usually this will be a ``dicom.dataset.Dataset`` object - resulting from reading a DICOM file. A dict should also work. + resulting from reading a DICOM file. csa_header : None or mapping, optional mapping giving values for Siemens CSA image sub-header. If None, we try and read the CSA information from `dcm_data`. @@ -773,6 +989,11 @@ def __init__(self, dcm_data, csa_header=None): csa_header = {} self.csa_header = csa_header + @cached_property + def vendor(self): + """The vendor of the instrument that produced the DICOM""" + return Vendor.SIEMENS + @cached_property def slice_normal(self): # The std_slice_normal comes from the cross product of the directions @@ -964,7 +1185,7 @@ def image_position(self): Q = np.fliplr(iop) * pix_spacing return ipp + np.dot(Q, vox_trans_fixes[:, None]).ravel() - def get_data(self): + def get_unscaled_data(self): """Get scaled image data from DICOMs Resorts data block from mosaic to 3D @@ -1007,8 +1228,7 @@ def get_data(self): # pool mosaic-generated dims v3 = v4.reshape((n_slice_rows, n_slice_cols, n_blocks)) # delete any padding slices - v3 = v3[..., :n_mosaic] - return self._scale_data(v3) + return v3[..., :n_mosaic] def none_or_close(val1, val2, rtol=1e-5, atol=1e-6): diff --git a/nibabel/nicom/tests/test_dicomwrappers.py b/nibabel/nicom/tests/test_dicomwrappers.py index e01759c86..0556fc63c 100755 --- a/nibabel/nicom/tests/test_dicomwrappers.py +++ b/nibabel/nicom/tests/test_dicomwrappers.py @@ -1,7 +1,7 @@ """Testing DICOM wrappers""" import gzip -from copy import copy +from copy import deepcopy from decimal import Decimal from hashlib import sha1 from os.path import dirname @@ -11,6 +11,7 @@ import numpy as np import pytest from numpy.testing import assert_array_almost_equal, assert_array_equal +from pydicom.dataset import Dataset from ...tests.nibabel_data import get_nibabel_data, needs_nibabel_data from ...volumeutils import endian_codes @@ -63,8 +64,8 @@ def test_wrappers(): # test direct wrapper calls # first with empty or minimal data multi_minimal = { - 'PerFrameFunctionalGroupsSequence': [None], - 'SharedFunctionalGroupsSequence': [None], + 'PerFrameFunctionalGroupsSequence': [Dataset()], + 'SharedFunctionalGroupsSequence': [Dataset()], } for maker, args in ( (didw.Wrapper, ({},)), @@ -163,10 +164,10 @@ def test_wrapper_from_data(): fake_data['SOPClassUID'] = '1.2.840.10008.5.1.4.1.1.4.1' with pytest.raises(didw.WrapperError): didw.wrapper_from_data(fake_data) - fake_data['PerFrameFunctionalGroupsSequence'] = [None] + fake_data['PerFrameFunctionalGroupsSequence'] = [Dataset()] with pytest.raises(didw.WrapperError): didw.wrapper_from_data(fake_data) - fake_data['SharedFunctionalGroupsSequence'] = [None] + fake_data['SharedFunctionalGroupsSequence'] = [Dataset()] # minimal set should now be met dw = didw.wrapper_from_data(fake_data) assert dw.is_multiframe @@ -384,16 +385,17 @@ def fake_frames(seq_name, field_name, value_seq, frame_seq=None): each element in list is obj.[0]. = value_seq[n] for n in range(N) """ - - class Fake: - pass - if frame_seq is None: - frame_seq = [Fake() for _ in range(len(value_seq))] + frame_seq = [Dataset() for _ in range(len(value_seq))] for value, fake_frame in zip(value_seq, frame_seq): - fake_element = Fake() + if value is None: + continue + if hasattr(fake_frame, seq_name): + fake_element = getattr(fake_frame, seq_name)[0] + else: + fake_element = Dataset() + setattr(fake_frame, seq_name, [fake_element]) setattr(fake_element, field_name, value) - setattr(fake_frame, seq_name, [fake_element]) return frame_seq @@ -434,27 +436,32 @@ def __repr__(self): attr_strs.append(f'{attr}={getattr(self, attr)}') return f"{self.__class__.__name__}({', '.join(attr_strs)})" - class DimIdxSeqElem(PrintBase): + class DimIdxSeqElem(Dataset): def __init__(self, dip=(0, 0), fgp=None): + super().__init__() self.DimensionIndexPointer = dip if fgp is not None: self.FunctionalGroupPointer = fgp - class FrmContSeqElem(PrintBase): + class FrmContSeqElem(Dataset): def __init__(self, div, sid): + super().__init__() self.DimensionIndexValues = div self.StackID = sid - class PlnPosSeqElem(PrintBase): + class PlnPosSeqElem(Dataset): def __init__(self, ipp): + super().__init__() self.ImagePositionPatient = ipp - class PlnOrientSeqElem(PrintBase): + class PlnOrientSeqElem(Dataset): def __init__(self, iop): + super().__init__() self.ImageOrientationPatient = iop - class PerFrmFuncGrpSeqElem(PrintBase): + class PerFrmFuncGrpSeqElem(Dataset): def __init__(self, div, sid, ipp, iop): + super().__init__() self.FrameContentSequence = [FrmContSeqElem(div, sid)] self.PlanePositionSequence = [PlnPosSeqElem(ipp)] self.PlaneOrientationSequence = [PlnOrientSeqElem(iop)] @@ -473,7 +480,7 @@ def __init__(self, div, sid, ipp, iop): frame_slc_indices = np.array(div_seq)[:, slice_dim] uniq_slc_indices = np.unique(frame_slc_indices) n_slices = len(uniq_slc_indices) - iop_seq = [(0.0, 1.0, 0.0, 1.0, 0.0, 0.0) for _ in range(num_of_frames)] + iop_seq = [[0.0, 1.0, 0.0, 1.0, 0.0, 0.0] for _ in range(num_of_frames)] if ipp_seq is None: slc_locs = np.linspace(-1.0, 1.0, n_slices) if flip_ipp_idx_corr: @@ -481,7 +488,7 @@ def __init__(self, div, sid, ipp, iop): slc_idx_loc = { div_idx: slc_locs[arr_idx] for arr_idx, div_idx in enumerate(np.sort(uniq_slc_indices)) } - ipp_seq = [(-1.0, -1.0, slc_idx_loc[idx]) for idx in frame_slc_indices] + ipp_seq = [[-1.0, -1.0, slc_idx_loc[idx]] for idx in frame_slc_indices] else: assert flip_ipp_idx_corr is False # caller can flip it themselves assert len(ipp_seq) == num_of_frames @@ -507,38 +514,37 @@ def __init__(self, div, sid, ipp, iop): } +class FakeDataset(Dataset): + pixel_array = None + + class TestMultiFrameWrapper(TestCase): # Test MultiframeWrapper - MINIMAL_MF = { - # Minimal contents of dcm_data for this wrapper - 'PerFrameFunctionalGroupsSequence': [None], - 'SharedFunctionalGroupsSequence': [None], - } + # Minimal contents of dcm_data for this wrapper + MINIMAL_MF = FakeDataset() + MINIMAL_MF.PerFrameFunctionalGroupsSequence = [Dataset()] + MINIMAL_MF.SharedFunctionalGroupsSequence = [Dataset()] WRAPCLASS = didw.MultiframeWrapper @dicom_test def test_shape(self): # Check the shape algorithm - fake_mf = copy(self.MINIMAL_MF) + fake_mf = deepcopy(self.MINIMAL_MF) MFW = self.WRAPCLASS dw = MFW(fake_mf) # No rows, cols, raise WrapperError with pytest.raises(didw.WrapperError): dw.image_shape - fake_mf['Rows'] = 64 + fake_mf.Rows = 64 with pytest.raises(didw.WrapperError): dw.image_shape fake_mf.pop('Rows') - fake_mf['Columns'] = 64 + fake_mf.Columns = 64 with pytest.raises(didw.WrapperError): dw.image_shape - fake_mf['Rows'] = 32 - # Missing frame data, raise AssertionError - with pytest.raises(AssertionError): - dw.image_shape - fake_mf['NumberOfFrames'] = 4 - # PerFrameFunctionalGroupsSequence does not match NumberOfFrames - with pytest.raises(AssertionError): + fake_mf.Rows = 32 + # No frame data raises WrapperError + with pytest.raises(didw.WrapperError): dw.image_shape # check 2D shape with StackID index is 0 div_seq = ((1, 1),) @@ -556,11 +562,32 @@ def test_shape(self): div_seq = ((1, 1), (1, 2), (1, 3), (1, 4)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) assert MFW(fake_mf).image_shape == (32, 64, 4) - # Check stack number matching when StackID index is 0 + # Check fow warning when implicitly dropping stacks div_seq = ((1, 1), (1, 2), (1, 3), (2, 4)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) + with pytest.warns( + UserWarning, + match='A multi-stack file was passed without an explicit filter, just using lowest StackID', + ): + assert MFW(fake_mf).image_shape == (32, 64, 3) + # No warning if we expclitly select that StackID to keep + assert MFW(fake_mf, frame_filters=(didw.FilterMultiStack(1),)).image_shape == (32, 64, 3) + assert MFW(fake_mf, frame_filters=(didw.FilterMultiStack(2),)).image_shape == (32, 64) + # Stack filtering is the same when StackID is not an index + div_seq = ((1,), (2,), (3,), (4,)) + sid_seq = (1, 1, 1, 2) + fake_mf.update(fake_shape_dependents(div_seq, sid_seq=sid_seq)) + with pytest.warns( + UserWarning, + match='A multi-stack file was passed without an explicit filter, just using lowest StackID', + ): + assert MFW(fake_mf).image_shape == (32, 64, 3) + # No warning if we expclitly select that StackID to keep + assert MFW(fake_mf, frame_filters=(didw.FilterMultiStack(1),)).image_shape == (32, 64, 3) + assert MFW(fake_mf, frame_filters=(didw.FilterMultiStack(2),)).image_shape == (32, 64) + # Check for error when explicitly requested StackID is missing with pytest.raises(didw.WrapperError): - MFW(fake_mf).image_shape + MFW(fake_mf, frame_filters=(didw.FilterMultiStack(3),)) # Make some fake frame data for 4D when StackID index is 0 div_seq = ((1, 1, 1), (1, 2, 1), (1, 1, 2), (1, 2, 2), (1, 1, 3), (1, 2, 3)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) @@ -568,8 +595,12 @@ def test_shape(self): # Check stack number matching for 4D when StackID index is 0 div_seq = ((1, 1, 1), (1, 2, 1), (1, 1, 2), (1, 2, 2), (1, 1, 3), (2, 2, 3)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) - with pytest.raises(didw.WrapperError): - MFW(fake_mf).image_shape + with pytest.warns( + UserWarning, + match='A multi-stack file was passed without an explicit filter, just using lowest StackID', + ): + with pytest.raises(didw.WrapperError): + MFW(fake_mf).image_shape # Check indices can be non-contiguous when StackID index is 0 div_seq = ((1, 1, 1), (1, 2, 1), (1, 1, 3), (1, 2, 3)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) @@ -579,7 +610,7 @@ def test_shape(self): fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) assert MFW(fake_mf).image_shape == (32, 64, 2, 2) # Check number of IPP vals match the number of slices or we raise - frames = fake_mf['PerFrameFunctionalGroupsSequence'] + frames = fake_mf.PerFrameFunctionalGroupsSequence for frame in frames[1:]: frame.PlanePositionSequence = frames[0].PlanePositionSequence[:] with pytest.raises(didw.WrapperError): @@ -594,12 +625,6 @@ def test_shape(self): sid_seq = (1, 1, 1, 1) fake_mf.update(fake_shape_dependents(div_seq, sid_seq=sid_seq)) assert MFW(fake_mf).image_shape == (32, 64, 4) - # check 3D stack number matching when there is no StackID index - div_seq = ((1,), (2,), (3,), (4,)) - sid_seq = (1, 1, 1, 2) - fake_mf.update(fake_shape_dependents(div_seq, sid_seq=sid_seq)) - with pytest.raises(didw.WrapperError): - MFW(fake_mf).image_shape # check 4D shape when there is no StackID index div_seq = ((1, 1), (2, 1), (1, 2), (2, 2), (1, 3), (2, 3)) sid_seq = (1, 1, 1, 1, 1, 1) @@ -609,8 +634,12 @@ def test_shape(self): div_seq = ((1, 1), (2, 1), (1, 2), (2, 2), (1, 3), (2, 3)) sid_seq = (1, 1, 1, 1, 1, 2) fake_mf.update(fake_shape_dependents(div_seq, sid_seq=sid_seq)) - with pytest.raises(didw.WrapperError): - MFW(fake_mf).image_shape + with pytest.warns( + UserWarning, + match='A multi-stack file was passed without an explicit filter, just using lowest StackID', + ): + with pytest.raises(didw.WrapperError): + MFW(fake_mf).image_shape # check 3D shape when StackID index is 1 div_seq = ((1, 1), (2, 1), (3, 1), (4, 1)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=1)) @@ -618,8 +647,11 @@ def test_shape(self): # Check stack number matching when StackID index is 1 div_seq = ((1, 1), (2, 1), (3, 2), (4, 1)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=1)) - with pytest.raises(didw.WrapperError): - MFW(fake_mf).image_shape + with pytest.warns( + UserWarning, + match='A multi-stack file was passed without an explicit filter, just using lowest StackID', + ): + assert MFW(fake_mf).image_shape == (32, 64, 3) # Make some fake frame data for 4D when StackID index is 1 div_seq = ((1, 1, 1), (2, 1, 1), (1, 1, 2), (2, 1, 2), (1, 1, 3), (2, 1, 3)) fake_mf.update(fake_shape_dependents(div_seq, sid_dim=1)) @@ -689,7 +721,7 @@ def test_shape(self): def test_iop(self): # Test Image orient patient for multiframe - fake_mf = copy(self.MINIMAL_MF) + fake_mf = deepcopy(self.MINIMAL_MF) MFW = self.WRAPCLASS dw = MFW(fake_mf) with pytest.raises(didw.WrapperError): @@ -698,56 +730,56 @@ def test_iop(self): fake_frame = fake_frames( 'PlaneOrientationSequence', 'ImageOrientationPatient', [[0, 1, 0, 1, 0, 0]] )[0] - fake_mf['SharedFunctionalGroupsSequence'] = [fake_frame] + fake_mf.SharedFunctionalGroupsSequence = [fake_frame] assert_array_equal(MFW(fake_mf).image_orient_patient, [[0, 1], [1, 0], [0, 0]]) - fake_mf['SharedFunctionalGroupsSequence'] = [None] + fake_mf.SharedFunctionalGroupsSequence = [Dataset()] with pytest.raises(didw.WrapperError): MFW(fake_mf).image_orient_patient - fake_mf['PerFrameFunctionalGroupsSequence'] = [fake_frame] + fake_mf.PerFrameFunctionalGroupsSequence = [fake_frame] assert_array_equal(MFW(fake_mf).image_orient_patient, [[0, 1], [1, 0], [0, 0]]) def test_voxel_sizes(self): # Test voxel size calculation - fake_mf = copy(self.MINIMAL_MF) + fake_mf = deepcopy(self.MINIMAL_MF) MFW = self.WRAPCLASS dw = MFW(fake_mf) with pytest.raises(didw.WrapperError): dw.voxel_sizes # Make a fake frame fake_frame = fake_frames('PixelMeasuresSequence', 'PixelSpacing', [[2.1, 3.2]])[0] - fake_mf['SharedFunctionalGroupsSequence'] = [fake_frame] + fake_mf.SharedFunctionalGroupsSequence = [fake_frame] # Still not enough, we lack information for slice distances with pytest.raises(didw.WrapperError): MFW(fake_mf).voxel_sizes # This can come from SpacingBetweenSlices or frame SliceThickness - fake_mf['SpacingBetweenSlices'] = 4.3 + fake_mf.SpacingBetweenSlices = 4.3 assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 4.3]) # If both, prefer SliceThickness fake_frame.PixelMeasuresSequence[0].SliceThickness = 5.4 assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 5.4]) # Just SliceThickness is OK - del fake_mf['SpacingBetweenSlices'] + del fake_mf.SpacingBetweenSlices assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 5.4]) # Removing shared leads to error again - fake_mf['SharedFunctionalGroupsSequence'] = [None] + fake_mf.SharedFunctionalGroupsSequence = [Dataset()] with pytest.raises(didw.WrapperError): MFW(fake_mf).voxel_sizes # Restoring to frames makes it work again - fake_mf['PerFrameFunctionalGroupsSequence'] = [fake_frame] + fake_mf.PerFrameFunctionalGroupsSequence = [fake_frame] assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 5.4]) # Decimals in any field are OK fake_frame = fake_frames( 'PixelMeasuresSequence', 'PixelSpacing', [[Decimal('2.1'), Decimal('3.2')]] )[0] - fake_mf['SharedFunctionalGroupsSequence'] = [fake_frame] - fake_mf['SpacingBetweenSlices'] = Decimal('4.3') + fake_mf.SharedFunctionalGroupsSequence = [fake_frame] + fake_mf.SpacingBetweenSlices = Decimal('4.3') assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 4.3]) fake_frame.PixelMeasuresSequence[0].SliceThickness = Decimal('5.4') assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 5.4]) def test_image_position(self): # Test image_position property for multiframe - fake_mf = copy(self.MINIMAL_MF) + fake_mf = deepcopy(self.MINIMAL_MF) MFW = self.WRAPCLASS dw = MFW(fake_mf) with pytest.raises(didw.WrapperError): @@ -758,12 +790,12 @@ def test_image_position(self): frames = fake_frames( 'PlanePositionSequence', 'ImagePositionPatient', [[-2.0, 3.0, 7]], frames ) - fake_mf['SharedFunctionalGroupsSequence'] = frames + fake_mf.SharedFunctionalGroupsSequence = frames assert_array_equal(MFW(fake_mf).image_position, [-2, 3, 7]) - fake_mf['SharedFunctionalGroupsSequence'] = [None] + fake_mf.SharedFunctionalGroupsSequence = [Dataset()] with pytest.raises(didw.WrapperError): MFW(fake_mf).image_position - fake_mf['PerFrameFunctionalGroupsSequence'] = frames + fake_mf.PerFrameFunctionalGroupsSequence = frames assert_array_equal(MFW(fake_mf).image_position, [-2, 3, 7]) # Check lists of Decimals work frames[0].PlanePositionSequence[0].ImagePositionPatient = [ @@ -775,7 +807,7 @@ def test_image_position(self): frames = fake_frames('PlaneOrientationSequence', 'ImageOrientationPatient', [iop] * 2) ipps = [[-2.0, 3.0, 7], [-2.0, 3.0, 6]] frames = fake_frames('PlanePositionSequence', 'ImagePositionPatient', ipps, frames) - fake_mf['PerFrameFunctionalGroupsSequence'] = frames + fake_mf.PerFrameFunctionalGroupsSequence = frames assert_array_equal(MFW(fake_mf).image_position, [-2, 3, 6]) @dicom_test @@ -809,9 +841,9 @@ def test_slicethickness_fallback(self): def test_data_derived_shape(self): # Test 4D diffusion data with an additional trace volume included # Excludes the trace volume and generates the correct shape - dw = didw.wrapper_from_file(DATA_FILE_4D_DERIVED) with pytest.warns(UserWarning, match='Derived images found and removed'): - assert dw.image_shape == (96, 96, 60, 33) + dw = didw.wrapper_from_file(DATA_FILE_4D_DERIVED) + assert dw.image_shape == (96, 96, 60, 33) @dicom_test @needs_nibabel_data('dcm_qa_xa30') @@ -831,7 +863,7 @@ def test_data_unreadable_private_headers(self): @dicom_test def test_data_fake(self): # Test algorithm for get_data - fake_mf = copy(self.MINIMAL_MF) + fake_mf = deepcopy(self.MINIMAL_MF) MFW = self.WRAPCLASS dw = MFW(fake_mf) # Fails - no shape @@ -843,8 +875,8 @@ def test_data_fake(self): with pytest.raises(didw.WrapperError): dw.get_data() # Make shape and indices - fake_mf['Rows'] = 2 - fake_mf['Columns'] = 3 + fake_mf.Rows = 2 + fake_mf.Columns = 3 dim_idxs = ((1, 1), (1, 2), (1, 3), (1, 4)) fake_mf.update(fake_shape_dependents(dim_idxs, sid_dim=0)) assert MFW(fake_mf).image_shape == (2, 3, 4) @@ -854,24 +886,24 @@ def test_data_fake(self): # Add data - 3D data = np.arange(24).reshape((2, 3, 4)) # Frames dim is first for some reason - fake_mf['pixel_array'] = np.rollaxis(data, 2) + object.__setattr__(fake_mf, 'pixel_array', np.rollaxis(data, 2)) # Now it should work dw = MFW(fake_mf) assert_array_equal(dw.get_data(), data) # Test scaling works - fake_mf['RescaleSlope'] = 2.0 - fake_mf['RescaleIntercept'] = -1 + fake_mf.RescaleSlope = 2.0 + fake_mf.RescaleIntercept = -1 assert_array_equal(MFW(fake_mf).get_data(), data * 2.0 - 1) # Check slice sorting dim_idxs = ((1, 4), (1, 2), (1, 3), (1, 1)) fake_mf.update(fake_shape_dependents(dim_idxs, sid_dim=0)) sorted_data = data[..., [3, 1, 2, 0]] - fake_mf['pixel_array'] = np.rollaxis(sorted_data, 2) + fake_mf.pixel_array = np.rollaxis(sorted_data, 2) assert_array_equal(MFW(fake_mf).get_data(), data * 2.0 - 1) # Check slice sorting with negative index / IPP correlation fake_mf.update(fake_shape_dependents(dim_idxs, sid_dim=0, flip_ipp_idx_corr=True)) sorted_data = data[..., [0, 2, 1, 3]] - fake_mf['pixel_array'] = np.rollaxis(sorted_data, 2) + fake_mf.pixel_array = np.rollaxis(sorted_data, 2) assert_array_equal(MFW(fake_mf).get_data(), data * 2.0 - 1) # 5D! dim_idxs = [ @@ -898,28 +930,167 @@ def test_data_fake(self): sorted_data = data.reshape(shape[:2] + (-1,), order='F') order = [11, 9, 10, 8, 3, 1, 2, 0, 15, 13, 14, 12, 7, 5, 6, 4] sorted_data = sorted_data[..., np.argsort(order)] - fake_mf['pixel_array'] = np.rollaxis(sorted_data, 2) + fake_mf.pixel_array = np.rollaxis(sorted_data, 2) assert_array_equal(MFW(fake_mf).get_data(), data * 2.0 - 1) - def test__scale_data(self): + def test_scale_data(self): # Test data scaling - fake_mf = copy(self.MINIMAL_MF) + fake_mf = deepcopy(self.MINIMAL_MF) + fake_mf.Rows = 2 + fake_mf.Columns = 3 + fake_mf.PerFrameFunctionalGroupsSequence = [Dataset() for _ in range(4)] MFW = self.WRAPCLASS - dw = MFW(fake_mf) - data = np.arange(24).reshape((2, 3, 4)) - assert_array_equal(data, dw._scale_data(data)) - fake_mf['RescaleSlope'] = 2.0 - fake_mf['RescaleIntercept'] = -1.0 - assert_array_equal(data * 2 - 1, dw._scale_data(data)) - fake_frame = fake_frames('PixelValueTransformationSequence', 'RescaleSlope', [3.0])[0] - fake_mf['PerFrameFunctionalGroupsSequence'] = [fake_frame] - # Lacking RescaleIntercept -> Error - dw = MFW(fake_mf) - with pytest.raises(AttributeError): - dw._scale_data(data) - fake_frame.PixelValueTransformationSequence[0].RescaleIntercept = -2 - assert_array_equal(data * 3 - 2, dw._scale_data(data)) + data = np.arange(24).reshape((2, 3, 4), order='F') + assert_array_equal(data, MFW(fake_mf)._scale_data(data)) + # Test legacy top-level slope/intercept + fake_mf.RescaleSlope = 2.0 + fake_mf.RescaleIntercept = -1.0 + assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) + # RealWorldValueMapping takes precedence, but only with defined units + fake_mf.RealWorldValueMappingSequence = [Dataset()] + fake_mf.RealWorldValueMappingSequence[0].RealWorldValueSlope = 10.0 + fake_mf.RealWorldValueMappingSequence[0].RealWorldValueIntercept = -5.0 + assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) + fake_mf.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence = [Dataset()] + fake_mf.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence[0].CodeMeaning = '%' + assert_array_equal(data * 10 - 5, MFW(fake_mf)._scale_data(data)) + fake_mf.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence[ + 0 + ].CodeMeaning = 'no units' + assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) + # Possible to have more than one RealWorldValueMapping, use first one with defined units + fake_mf.RealWorldValueMappingSequence.append(Dataset()) + fake_mf.RealWorldValueMappingSequence[-1].RealWorldValueSlope = 15.0 + fake_mf.RealWorldValueMappingSequence[-1].RealWorldValueIntercept = -3.0 + fake_mf.RealWorldValueMappingSequence[-1].MeasurementUnitsCodeSequence = [Dataset()] + fake_mf.RealWorldValueMappingSequence[-1].MeasurementUnitsCodeSequence[0].CodeMeaning = '%' + assert_array_equal(data * 15 - 3, MFW(fake_mf)._scale_data(data)) + # A global RWV scale takes precedence over per-frame PixelValueTransformation + div_seq = ((1, 1, 1), (1, 2, 1), (1, 1, 2), (1, 2, 2)) + fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) + frames = fake_frames( + 'PixelValueTransformationSequence', + 'RescaleSlope', + [3.0, 3.0, 3.0, 3.0], + fake_mf.PerFrameFunctionalGroupsSequence, + ) + assert_array_equal(data * 15 - 3, MFW(fake_mf)._scale_data(data)) + # The per-frame PixelValueTransformation takes precedence over plain top-level slope / inter + delattr(fake_mf, 'RealWorldValueMappingSequence') + assert_array_equal(data * 3, MFW(fake_mf)._scale_data(data)) + for frame in frames: + frame.PixelValueTransformationSequence[0].RescaleIntercept = -2 + assert_array_equal(data * 3 - 2, MFW(fake_mf)._scale_data(data)) # Decimals are OK - fake_frame.PixelValueTransformationSequence[0].RescaleSlope = Decimal('3') - fake_frame.PixelValueTransformationSequence[0].RescaleIntercept = Decimal('-2') - assert_array_equal(data * 3 - 2, dw._scale_data(data)) + for frame in frames: + frame.PixelValueTransformationSequence[0].RescaleSlope = Decimal('3') + frame.PixelValueTransformationSequence[0].RescaleIntercept = Decimal('-2') + assert_array_equal(data * 3 - 2, MFW(fake_mf)._scale_data(data)) + # A per-frame RWV scaling takes precedence over per-frame PixelValueTransformation + for frame in frames: + frame.RealWorldValueMappingSequence = [Dataset()] + frame.RealWorldValueMappingSequence[0].RealWorldValueSlope = 10.0 + frame.RealWorldValueMappingSequence[0].RealWorldValueIntercept = -5.0 + frame.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence = [Dataset()] + frame.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence[ + 0 + ].CodeMeaning = '%' + assert_array_equal(data * 10 - 5, MFW(fake_mf)._scale_data(data)) + # Test varying per-frame scale factors + for frame_idx, frame in enumerate(frames): + frame.RealWorldValueMappingSequence[0].RealWorldValueSlope = 2 * (frame_idx + 1) + frame.RealWorldValueMappingSequence[0].RealWorldValueIntercept = -1 * (frame_idx + 1) + assert_array_equal( + data * np.array([2, 4, 6, 8]) + np.array([-1, -2, -3, -4]), + MFW(fake_mf)._scale_data(data), + ) + + def test_philips_scale_data(self): + fake_mf = deepcopy(self.MINIMAL_MF) + fake_mf.Manufacturer = 'Philips' + fake_mf.Rows = 2 + fake_mf.Columns = 3 + fake_mf.PerFrameFunctionalGroupsSequence = [Dataset() for _ in range(4)] + MFW = self.WRAPCLASS + data = np.arange(24).reshape((2, 3, 4), order='F') + # Unlike other manufacturers, public scale factors from Philips without defined + # units should not be used. In lieu of this the private scale factor should be + # used, which should always be available (modulo deidentification). If we can't + # find any of these scale factors a warning is issued. + with pytest.warns( + UserWarning, + match='Unable to find Philips private scale factor, cross-series comparisons may be invalid', + ): + assert_array_equal(data, MFW(fake_mf)._scale_data(data)) + fake_mf.RescaleSlope = 2.0 + fake_mf.RescaleIntercept = -1.0 + for rescale_type in (None, '', 'US', 'normalized'): + if rescale_type is not None: + fake_mf.RescaleType = rescale_type + with pytest.warns( + UserWarning, + match='Unable to find Philips private scale factor, cross-series comparisons may be invalid', + ): + assert_array_equal(data, MFW(fake_mf)._scale_data(data)) + # Falling back to private scaling doesn't generate error + priv_block = fake_mf.private_block(0x2005, 'Philips MR Imaging DD 001', create=True) + priv_block.add_new(0xE, 'FL', 3.0) + assert_array_equal(data * 3.0, MFW(fake_mf)._scale_data(data)) + # If the units are defined they take precedence over private scaling + fake_mf.RescaleType = 'mrad' + assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) + # A RWV scale factor with defined units takes precdence + shared = Dataset() + fake_mf.SharedFunctionalGroupsSequence = [shared] + rwv_map = Dataset() + rwv_map.RealWorldValueSlope = 10.0 + rwv_map.RealWorldValueIntercept = -5.0 + rwv_map.MeasurementUnitsCodeSequence = [Dataset()] + rwv_map.MeasurementUnitsCodeSequence[0].CodeMeaning = '%' + shared.RealWorldValueMappingSequence = [rwv_map] + assert_array_equal(data * 10 - 5, MFW(fake_mf)._scale_data(data)) + # Get rid of valid top-level scale factors, test per-frame scale factors + delattr(shared, 'RealWorldValueMappingSequence') + delattr(fake_mf, 'RescaleType') + del fake_mf[priv_block.get_tag(0xE)] + div_seq = ((1, 1, 1), (1, 2, 1), (1, 1, 2), (1, 2, 2)) + fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) + # Simplest case is all frames have same (valid) scale factor + for frame in fake_mf.PerFrameFunctionalGroupsSequence: + pix_trans = Dataset() + pix_trans.RescaleSlope = 2.5 + pix_trans.RescaleIntercept = -4 + pix_trans.RescaleType = 'mrad' + frame.PixelValueTransformationSequence = [pix_trans] + assert_array_equal(data * 2.5 - 4, MFW(fake_mf)._scale_data(data)) + # If some frames are missing valid scale factors we should get a warning + for frame in fake_mf.PerFrameFunctionalGroupsSequence[2:]: + delattr(frame.PixelValueTransformationSequence[0], 'RescaleType') + with pytest.warns( + UserWarning, + match='Unable to find Philips private scale factor, cross-series comparisons may be invalid', + ): + assert_array_equal( + data * np.array([2.5, 2.5, 1, 1]) + np.array([-4, -4, 0, 0]), + MFW(fake_mf)._scale_data(data), + ) + # We can fall back to private scale factor on frame-by-frame basis + for frame in fake_mf.PerFrameFunctionalGroupsSequence: + priv_block = frame.private_block(0x2005, 'Philips MR Imaging DD 001', create=True) + priv_block.add_new(0xE, 'FL', 7.0) + assert_array_equal( + data * np.array([2.5, 2.5, 7, 7]) + np.array([-4, -4, 0, 0]), + MFW(fake_mf)._scale_data(data), + ) + # Again RWV scale factors take precedence + for frame_idx, frame in enumerate(fake_mf.PerFrameFunctionalGroupsSequence): + rwv_map = Dataset() + rwv_map.RealWorldValueSlope = 14.0 - frame_idx + rwv_map.RealWorldValueIntercept = 5.0 + rwv_map.MeasurementUnitsCodeSequence = [Dataset()] + rwv_map.MeasurementUnitsCodeSequence[0].CodeMeaning = '%' + frame.RealWorldValueMappingSequence = [rwv_map] + assert_array_equal( + data * np.array([14, 13, 12, 11]) + np.array([5, 5, 5, 5]), + MFW(fake_mf)._scale_data(data), + ) diff --git a/nibabel/nicom/utils.py b/nibabel/nicom/utils.py index 24f4afc2f..2c01c9d16 100644 --- a/nibabel/nicom/utils.py +++ b/nibabel/nicom/utils.py @@ -1,5 +1,7 @@ """Utilities for working with DICOM datasets""" +from enum import Enum + def find_private_section(dcm_data, group_no, creator): """Return start element in group `group_no` given creator name `creator` @@ -45,3 +47,55 @@ def find_private_section(dcm_data, group_no, creator): if match_func(val): return elno * 0x100 return None + + +class Vendor(Enum): + SIEMENS = 1 + GE = 2 + PHILIPS = 3 + + +vendor_priv_sections = { + Vendor.SIEMENS: [ + (0x9, 'SIEMENS SYNGO INDEX SERVICE'), + (0x19, 'SIEMENS MR HEADER'), + (0x21, 'SIEMENS MR SDR 01'), + (0x21, 'SIEMENS MR SDS 01'), + (0x21, 'SIEMENS MR SDI 02'), + (0x29, 'SIEMENS CSA HEADER'), + (0x29, 'SIEMENS MEDCOM HEADER2'), + (0x51, 'SIEMENS MR HEADER'), + ], + Vendor.PHILIPS: [ + (0x2001, 'Philips Imaging DD 001'), + (0x2001, 'Philips Imaging DD 002'), + (0x2001, 'Philips Imaging DD 129'), + (0x2005, 'Philips MR Imaging DD 001'), + (0x2005, 'Philips MR Imaging DD 002'), + (0x2005, 'Philips MR Imaging DD 003'), + (0x2005, 'Philips MR Imaging DD 004'), + (0x2005, 'Philips MR Imaging DD 005'), + (0x2005, 'Philips MR Imaging DD 006'), + (0x2005, 'Philips MR Imaging DD 007'), + (0x2005, 'Philips MR Imaging DD 005'), + (0x2005, 'Philips MR Imaging DD 006'), + ], + Vendor.GE: [ + (0x9, 'GEMS_IDEN_01'), + (0x19, 'GEMS_ACQU_01'), + (0x21, 'GEMS_RELA_01'), + (0x23, 'GEMS_STDY_01'), + (0x25, 'GEMS_SERS_01'), + (0x27, 'GEMS_IMAG_01'), + (0x29, 'GEMS_IMPS_01'), + (0x43, 'GEMS_PARM_01'), + ], +} + + +def vendor_from_private(dcm_data): + """Try to determine the vendor by looking for specific private tags""" + for vendor, priv_sections in vendor_priv_sections.items(): + for priv_group, priv_creator in priv_sections: + if find_private_section(dcm_data, priv_group, priv_creator) != None: + return vendor From f0264abbb295e063ea8b66be36d56319a30b2ecb Mon Sep 17 00:00:00 2001 From: Brendan Moloney Date: Mon, 12 Aug 2024 17:14:04 -0700 Subject: [PATCH 2/2] TST: Don't assume pydicom installed in test_dicomwrappers --- nibabel/nicom/tests/test_dicomwrappers.py | 84 +++++++++++++---------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/nibabel/nicom/tests/test_dicomwrappers.py b/nibabel/nicom/tests/test_dicomwrappers.py index 0556fc63c..55c27df50 100755 --- a/nibabel/nicom/tests/test_dicomwrappers.py +++ b/nibabel/nicom/tests/test_dicomwrappers.py @@ -11,7 +11,6 @@ import numpy as np import pytest from numpy.testing import assert_array_almost_equal, assert_array_equal -from pydicom.dataset import Dataset from ...tests.nibabel_data import get_nibabel_data, needs_nibabel_data from ...volumeutils import endian_codes @@ -64,8 +63,8 @@ def test_wrappers(): # test direct wrapper calls # first with empty or minimal data multi_minimal = { - 'PerFrameFunctionalGroupsSequence': [Dataset()], - 'SharedFunctionalGroupsSequence': [Dataset()], + 'PerFrameFunctionalGroupsSequence': [pydicom.Dataset()], + 'SharedFunctionalGroupsSequence': [pydicom.Dataset()], } for maker, args in ( (didw.Wrapper, ({},)), @@ -164,10 +163,10 @@ def test_wrapper_from_data(): fake_data['SOPClassUID'] = '1.2.840.10008.5.1.4.1.1.4.1' with pytest.raises(didw.WrapperError): didw.wrapper_from_data(fake_data) - fake_data['PerFrameFunctionalGroupsSequence'] = [Dataset()] + fake_data['PerFrameFunctionalGroupsSequence'] = [pydicom.Dataset()] with pytest.raises(didw.WrapperError): didw.wrapper_from_data(fake_data) - fake_data['SharedFunctionalGroupsSequence'] = [Dataset()] + fake_data['SharedFunctionalGroupsSequence'] = [pydicom.Dataset()] # minimal set should now be met dw = didw.wrapper_from_data(fake_data) assert dw.is_multiframe @@ -386,14 +385,14 @@ def fake_frames(seq_name, field_name, value_seq, frame_seq=None): value_seq[n] for n in range(N) """ if frame_seq is None: - frame_seq = [Dataset() for _ in range(len(value_seq))] + frame_seq = [pydicom.Dataset() for _ in range(len(value_seq))] for value, fake_frame in zip(value_seq, frame_seq): if value is None: continue if hasattr(fake_frame, seq_name): fake_element = getattr(fake_frame, seq_name)[0] else: - fake_element = Dataset() + fake_element = pydicom.Dataset() setattr(fake_frame, seq_name, [fake_element]) setattr(fake_element, field_name, value) return frame_seq @@ -436,30 +435,30 @@ def __repr__(self): attr_strs.append(f'{attr}={getattr(self, attr)}') return f"{self.__class__.__name__}({', '.join(attr_strs)})" - class DimIdxSeqElem(Dataset): + class DimIdxSeqElem(pydicom.Dataset): def __init__(self, dip=(0, 0), fgp=None): super().__init__() self.DimensionIndexPointer = dip if fgp is not None: self.FunctionalGroupPointer = fgp - class FrmContSeqElem(Dataset): + class FrmContSeqElem(pydicom.Dataset): def __init__(self, div, sid): super().__init__() self.DimensionIndexValues = div self.StackID = sid - class PlnPosSeqElem(Dataset): + class PlnPosSeqElem(pydicom.Dataset): def __init__(self, ipp): super().__init__() self.ImagePositionPatient = ipp - class PlnOrientSeqElem(Dataset): + class PlnOrientSeqElem(pydicom.Dataset): def __init__(self, iop): super().__init__() self.ImageOrientationPatient = iop - class PerFrmFuncGrpSeqElem(Dataset): + class PerFrmFuncGrpSeqElem(pydicom.Dataset): def __init__(self, div, sid, ipp, iop): super().__init__() self.FrameContentSequence = [FrmContSeqElem(div, sid)] @@ -514,17 +513,21 @@ def __init__(self, div, sid, ipp, iop): } -class FakeDataset(Dataset): - pixel_array = None +if have_dicom: + + class FakeDataset(pydicom.Dataset): + pixel_array = None class TestMultiFrameWrapper(TestCase): # Test MultiframeWrapper - # Minimal contents of dcm_data for this wrapper - MINIMAL_MF = FakeDataset() - MINIMAL_MF.PerFrameFunctionalGroupsSequence = [Dataset()] - MINIMAL_MF.SharedFunctionalGroupsSequence = [Dataset()] - WRAPCLASS = didw.MultiframeWrapper + + if have_dicom: + # Minimal contents of dcm_data for this wrapper + MINIMAL_MF = FakeDataset() + MINIMAL_MF.PerFrameFunctionalGroupsSequence = [pydicom.Dataset()] + MINIMAL_MF.SharedFunctionalGroupsSequence = [pydicom.Dataset()] + WRAPCLASS = didw.MultiframeWrapper @dicom_test def test_shape(self): @@ -719,6 +722,7 @@ def test_shape(self): fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) assert MFW(fake_mf).image_shape == (32, 64, 3) + @dicom_test def test_iop(self): # Test Image orient patient for multiframe fake_mf = deepcopy(self.MINIMAL_MF) @@ -732,12 +736,13 @@ def test_iop(self): )[0] fake_mf.SharedFunctionalGroupsSequence = [fake_frame] assert_array_equal(MFW(fake_mf).image_orient_patient, [[0, 1], [1, 0], [0, 0]]) - fake_mf.SharedFunctionalGroupsSequence = [Dataset()] + fake_mf.SharedFunctionalGroupsSequence = [pydicom.Dataset()] with pytest.raises(didw.WrapperError): MFW(fake_mf).image_orient_patient fake_mf.PerFrameFunctionalGroupsSequence = [fake_frame] assert_array_equal(MFW(fake_mf).image_orient_patient, [[0, 1], [1, 0], [0, 0]]) + @dicom_test def test_voxel_sizes(self): # Test voxel size calculation fake_mf = deepcopy(self.MINIMAL_MF) @@ -761,7 +766,7 @@ def test_voxel_sizes(self): del fake_mf.SpacingBetweenSlices assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 5.4]) # Removing shared leads to error again - fake_mf.SharedFunctionalGroupsSequence = [Dataset()] + fake_mf.SharedFunctionalGroupsSequence = [pydicom.Dataset()] with pytest.raises(didw.WrapperError): MFW(fake_mf).voxel_sizes # Restoring to frames makes it work again @@ -777,6 +782,7 @@ def test_voxel_sizes(self): fake_frame.PixelMeasuresSequence[0].SliceThickness = Decimal('5.4') assert_array_equal(MFW(fake_mf).voxel_sizes, [2.1, 3.2, 5.4]) + @dicom_test def test_image_position(self): # Test image_position property for multiframe fake_mf = deepcopy(self.MINIMAL_MF) @@ -792,7 +798,7 @@ def test_image_position(self): ) fake_mf.SharedFunctionalGroupsSequence = frames assert_array_equal(MFW(fake_mf).image_position, [-2, 3, 7]) - fake_mf.SharedFunctionalGroupsSequence = [Dataset()] + fake_mf.SharedFunctionalGroupsSequence = [pydicom.Dataset()] with pytest.raises(didw.WrapperError): MFW(fake_mf).image_position fake_mf.PerFrameFunctionalGroupsSequence = frames @@ -933,12 +939,13 @@ def test_data_fake(self): fake_mf.pixel_array = np.rollaxis(sorted_data, 2) assert_array_equal(MFW(fake_mf).get_data(), data * 2.0 - 1) + @dicom_test def test_scale_data(self): # Test data scaling fake_mf = deepcopy(self.MINIMAL_MF) fake_mf.Rows = 2 fake_mf.Columns = 3 - fake_mf.PerFrameFunctionalGroupsSequence = [Dataset() for _ in range(4)] + fake_mf.PerFrameFunctionalGroupsSequence = [pydicom.Dataset() for _ in range(4)] MFW = self.WRAPCLASS data = np.arange(24).reshape((2, 3, 4), order='F') assert_array_equal(data, MFW(fake_mf)._scale_data(data)) @@ -947,11 +954,11 @@ def test_scale_data(self): fake_mf.RescaleIntercept = -1.0 assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) # RealWorldValueMapping takes precedence, but only with defined units - fake_mf.RealWorldValueMappingSequence = [Dataset()] + fake_mf.RealWorldValueMappingSequence = [pydicom.Dataset()] fake_mf.RealWorldValueMappingSequence[0].RealWorldValueSlope = 10.0 fake_mf.RealWorldValueMappingSequence[0].RealWorldValueIntercept = -5.0 assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) - fake_mf.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence = [Dataset()] + fake_mf.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence = [pydicom.Dataset()] fake_mf.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence[0].CodeMeaning = '%' assert_array_equal(data * 10 - 5, MFW(fake_mf)._scale_data(data)) fake_mf.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence[ @@ -959,10 +966,12 @@ def test_scale_data(self): ].CodeMeaning = 'no units' assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) # Possible to have more than one RealWorldValueMapping, use first one with defined units - fake_mf.RealWorldValueMappingSequence.append(Dataset()) + fake_mf.RealWorldValueMappingSequence.append(pydicom.Dataset()) fake_mf.RealWorldValueMappingSequence[-1].RealWorldValueSlope = 15.0 fake_mf.RealWorldValueMappingSequence[-1].RealWorldValueIntercept = -3.0 - fake_mf.RealWorldValueMappingSequence[-1].MeasurementUnitsCodeSequence = [Dataset()] + fake_mf.RealWorldValueMappingSequence[-1].MeasurementUnitsCodeSequence = [ + pydicom.Dataset() + ] fake_mf.RealWorldValueMappingSequence[-1].MeasurementUnitsCodeSequence[0].CodeMeaning = '%' assert_array_equal(data * 15 - 3, MFW(fake_mf)._scale_data(data)) # A global RWV scale takes precedence over per-frame PixelValueTransformation @@ -988,10 +997,12 @@ def test_scale_data(self): assert_array_equal(data * 3 - 2, MFW(fake_mf)._scale_data(data)) # A per-frame RWV scaling takes precedence over per-frame PixelValueTransformation for frame in frames: - frame.RealWorldValueMappingSequence = [Dataset()] + frame.RealWorldValueMappingSequence = [pydicom.Dataset()] frame.RealWorldValueMappingSequence[0].RealWorldValueSlope = 10.0 frame.RealWorldValueMappingSequence[0].RealWorldValueIntercept = -5.0 - frame.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence = [Dataset()] + frame.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence = [ + pydicom.Dataset() + ] frame.RealWorldValueMappingSequence[0].MeasurementUnitsCodeSequence[ 0 ].CodeMeaning = '%' @@ -1005,12 +1016,13 @@ def test_scale_data(self): MFW(fake_mf)._scale_data(data), ) + @dicom_test def test_philips_scale_data(self): fake_mf = deepcopy(self.MINIMAL_MF) fake_mf.Manufacturer = 'Philips' fake_mf.Rows = 2 fake_mf.Columns = 3 - fake_mf.PerFrameFunctionalGroupsSequence = [Dataset() for _ in range(4)] + fake_mf.PerFrameFunctionalGroupsSequence = [pydicom.Dataset() for _ in range(4)] MFW = self.WRAPCLASS data = np.arange(24).reshape((2, 3, 4), order='F') # Unlike other manufacturers, public scale factors from Philips without defined @@ -1040,12 +1052,12 @@ def test_philips_scale_data(self): fake_mf.RescaleType = 'mrad' assert_array_equal(data * 2 - 1, MFW(fake_mf)._scale_data(data)) # A RWV scale factor with defined units takes precdence - shared = Dataset() + shared = pydicom.Dataset() fake_mf.SharedFunctionalGroupsSequence = [shared] - rwv_map = Dataset() + rwv_map = pydicom.Dataset() rwv_map.RealWorldValueSlope = 10.0 rwv_map.RealWorldValueIntercept = -5.0 - rwv_map.MeasurementUnitsCodeSequence = [Dataset()] + rwv_map.MeasurementUnitsCodeSequence = [pydicom.Dataset()] rwv_map.MeasurementUnitsCodeSequence[0].CodeMeaning = '%' shared.RealWorldValueMappingSequence = [rwv_map] assert_array_equal(data * 10 - 5, MFW(fake_mf)._scale_data(data)) @@ -1057,7 +1069,7 @@ def test_philips_scale_data(self): fake_mf.update(fake_shape_dependents(div_seq, sid_dim=0)) # Simplest case is all frames have same (valid) scale factor for frame in fake_mf.PerFrameFunctionalGroupsSequence: - pix_trans = Dataset() + pix_trans = pydicom.Dataset() pix_trans.RescaleSlope = 2.5 pix_trans.RescaleIntercept = -4 pix_trans.RescaleType = 'mrad' @@ -1084,10 +1096,10 @@ def test_philips_scale_data(self): ) # Again RWV scale factors take precedence for frame_idx, frame in enumerate(fake_mf.PerFrameFunctionalGroupsSequence): - rwv_map = Dataset() + rwv_map = pydicom.Dataset() rwv_map.RealWorldValueSlope = 14.0 - frame_idx rwv_map.RealWorldValueIntercept = 5.0 - rwv_map.MeasurementUnitsCodeSequence = [Dataset()] + rwv_map.MeasurementUnitsCodeSequence = [pydicom.Dataset()] rwv_map.MeasurementUnitsCodeSequence[0].CodeMeaning = '%' frame.RealWorldValueMappingSequence = [rwv_map] assert_array_equal(