Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Remove accidental MRIQC dependency, allow app config to be passed to workflow plugin #876

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions niworkflows/engine/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#
"""A lightweight NiPype MultiProc execution plugin."""

# Import packages
import os
import sys
from copy import deepcopy
Expand All @@ -32,6 +31,8 @@
from traceback import format_exception
import gc

from nipype.utils.misc import str2bool


# Run node
def run_node(node, updatehash, taskid):
Expand Down Expand Up @@ -239,8 +240,6 @@ def _clear_task(self, taskid):
raise NotImplementedError

def _clean_queue(self, jobid, graph, result=None):
from mriqc import config

if self._status_callback:
self._status_callback(self.procs[jobid], "exception")
if result is None:
Expand All @@ -250,7 +249,7 @@ def _clean_queue(self, jobid, graph, result=None):
}

crashfile = self._report_crash(self.procs[jobid], result=result)
if config.nipype.stop_on_first_crash:
if str2bool(self._config["execution"]["stop_on_first_crash"]):
raise RuntimeError("".join(result["traceback"]))
if jobid in self.mapnodesubids:
# remove current jobid
Expand Down Expand Up @@ -292,9 +291,7 @@ def _submit_mapnode(self, jobid):
return False

def _local_hash_check(self, jobid, graph):
from mriqc import config

if not config.nipype.local_hash_check:
if not str2bool(self.procs[jobid].config["execution"]["local_hash_check"]):
return False

try:
Expand Down Expand Up @@ -368,9 +365,8 @@ def _remove_node_dirs(self):
"""Remove directories whose outputs have already been used up."""
import numpy as np
from shutil import rmtree
from mriqc import config

if config.nipype.remove_node_directories:
if str2bool(self._config["execution"]["remove_node_directories"]):
indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]
for idx in indices:
if idx in self.mapnodesubids:
Expand Down Expand Up @@ -413,8 +409,6 @@ def __init__(self, pool=None, plugin_args=None):
A Nipype-compatible dictionary of settings.

"""
from mriqc import config

super().__init__(plugin_args=plugin_args)
self._taskresult = {}
self._task_obj = {}
Expand All @@ -424,6 +418,24 @@ def __init__(self, pool=None, plugin_args=None):
# change to it when workers are set up
self._cwd = os.getcwd()

# Retrieve a nipreps-style configuration object
try:
config = plugin_args["app_config"]
except (KeyError, TypeError):
from types import SimpleNamespace
from nipype.utils.profiler import get_system_total_memory_gb

config = SimpleNamespace(
environment=SimpleNamespace(
# Nipype default
total_memory=get_system_total_memory_gb()
),
# concurrent.futures default
_process_initializer=None,
# Just needs to exist
file_path=None,
)

# Read in options or set defaults.
self.processors = self.plugin_args.get("n_procs", mp.cpu_count())
self.memory_gb = self.plugin_args.get(
Expand Down
92 changes: 92 additions & 0 deletions niworkflows/engine/tests/test_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
from types import SimpleNamespace

import pytest
from nipype.pipeline import engine as pe
from nipype.interfaces import utility as niu

from ..plugin import MultiProcPlugin


def add(x, y):
return x + y


def addall(inlist):
import time

time.sleep(0.2) # Simulate some work
return sum(inlist)


@pytest.fixture
def workflow(tmp_path):
workflow = pe.Workflow(name="test_wf", base_dir=tmp_path)

inputnode = pe.Node(niu.IdentityInterface(fields=["x", "y"]), name="inputnode")
outputnode = pe.Node(niu.IdentityInterface(fields=["z"]), name="outputnode")

# Generate many nodes and claim a lot of memory
add_nd = pe.MapNode(
niu.Function(function=add, input_names=["x", "y"], output_names=["z"]),
name="add",
iterfield=["x"],
mem_gb=0.8,
)

# Regular node
sum_nd = pe.Node(niu.Function(function=addall, input_names=["inlist"]), name="sum")

# Run without submitting is another code path
add_more_nd = pe.Node(
niu.Function(function=add, input_names=["x", "y"], output_names=["z"]),
name="add_more",
run_without_submitting=True,
)

workflow.connect(
[
(inputnode, add_nd, [("x", "x"), ("y", "y")]),
(add_nd, sum_nd, [("z", "inlist")]),
(sum_nd, add_more_nd, [("out", "x")]),
(inputnode, add_more_nd, [("y", "y")]),
(add_more_nd, outputnode, [("z", "z")]),
]
)

inputnode.inputs.x = list(range(30))
inputnode.inputs.y = 4

# Avoid unnecessary sleeps
workflow.config["execution"]["poll_sleep_duration"] = 0

return workflow


def test_plugin_defaults(workflow, caplog):
"""Test the plugin works without any arguments."""
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
workflow.run(plugin=MultiProcPlugin())


def test_plugin_args_noconfig(workflow, caplog):
"""Test the plugin works with typical nipype arguments."""
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
workflow.run(
plugin=MultiProcPlugin(),
plugin_args={"n_procs": 2, "memory_gb": 0.1},
)


def test_plugin_app_config(workflow, caplog):
"""Test the plugin works with a nipreps-style configuration."""
app_config = SimpleNamespace(
environment=SimpleNamespace(total_memory_gb=1),
_process_initializer=lambda x: None,
mgxd marked this conversation as resolved.
Show resolved Hide resolved
file_path='/does/not/need/to/exist/for/testing',
)
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
workflow.run(
plugin=MultiProcPlugin(),
plugin_args={"n_procs": 2, "app_config": app_config},
)
Loading