Skip to content

Commit

Permalink
Merge pull request #220 from nipy/enh/reenable-parallelization-apply-…
Browse files Browse the repository at this point in the history
…214-parallel

ENH: Parallelize serialized 3D+t transforms
  • Loading branch information
oesteban authored Aug 1, 2024
2 parents 4c06174 + 38bb388 commit 8ba34c9
Showing 1 changed file with 54 additions and 20 deletions.
74 changes: 54 additions & 20 deletions nitransforms/resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Resampling utilities."""

from os import cpu_count
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import numpy as np
from nibabel.loadsave import load as _nbload
Expand All @@ -25,6 +27,25 @@
"""Minimum number of volumes to automatically serialize 4D transforms."""


def _apply_volume(
index,
data,
targets,
order=3,
mode="constant",
cval=0.0,
prefilter=True,
):
return index, ndi.map_coordinates(

Check warning on line 39 in nitransforms/resampling.py

View check run for this annotation

Codecov / codecov/patch

nitransforms/resampling.py#L39

Added line #L39 was not covered by tests
data,
targets,
order=order,
mode=mode,
cval=cval,
prefilter=prefilter,
)


def apply(
transform,
spatialimage,
Expand Down Expand Up @@ -135,34 +156,47 @@ def apply(
else None
)

# Order F ensures individual volumes are contiguous in memory
# Also matches NIfTI, making final save more efficient
resampled = np.zeros(
(len(ref_ndcoords), len(transform)), dtype=input_dtype, order="F"
)
if njobs is None:
njobs = cpu_count()

for t in range(n_resamplings):
xfm_t = transform if n_resamplings == 1 else transform[t]
with ProcessPoolExecutor(max_workers=min(njobs, n_resamplings)) as executor:
results = []
for t in range(n_resamplings):
xfm_t = transform if n_resamplings == 1 else transform[t]

if targets is None:
targets = ImageGrid(spatialimage).index( # data should be an image
_as_homogeneous(xfm_t.map(ref_ndcoords), dim=_ref.ndim)
)
if targets is None:
targets = ImageGrid(spatialimage).index( # data should be an image
_as_homogeneous(xfm_t.map(ref_ndcoords), dim=_ref.ndim)
)

# Interpolate
resampled[..., t] = ndi.map_coordinates(
(
data_t = (
data
if data is not None
else spatialimage.dataobj[..., t].astype(input_dtype, copy=False)
),
targets,
order=order,
mode=mode,
cval=cval,
prefilter=prefilter,
)

results.append(
executor.submit(
_apply_volume,
t,
data_t,
targets,
order=order,
mode=mode,
cval=cval,
prefilter=prefilter,
)
)

# Order F ensures individual volumes are contiguous in memory
# Also matches NIfTI, making final save more efficient
resampled = np.zeros(
(len(ref_ndcoords), len(transform)), dtype=input_dtype, order="F"
)

for future in as_completed(results):
t, resampled_t = future.result()
resampled[..., t] = resampled_t
else:
data = np.asanyarray(spatialimage.dataobj, dtype=input_dtype)

Expand Down

0 comments on commit 8ba34c9

Please sign in to comment.