Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

support for gpu queue #3642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support for gpu queue
  • Loading branch information
mauriliogenovese committed Mar 22, 2024
commit 0720aa1ac9c672e8b7a6ede4828e603e62778b86
4 changes: 4 additions & 0 deletions 4 nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,10 @@ def update(self, **opts):
"""Update inputs"""
self.inputs.update(**opts)

def is_gpu_node(self):
return ((hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda)
or (hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu))


class JoinNode(Node):
"""Wraps interface objects that join inputs into a list.
Expand Down
62 changes: 55 additions & 7 deletions 62 nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class MultiProcPlugin(DistributedPluginBase):

- non_daemon: boolean flag to execute as non-daemon processes
- n_procs: maximum number of threads to be executed in parallel
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
- memory_gb: maximum memory (in GB) that can be used at once.
- raise_insufficient: raise error if the requested resources for
a node over the maximum `n_procs` and/or `memory_gb`
Expand Down Expand Up @@ -130,10 +131,22 @@ def __init__(self, plugin_args=None):
)
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)

# GPU found on system
self.n_gpus_visible = MultiProcPlugin.gpu_count()
# proc per GPU set by user
self.n_gpu_procs = plugin_args.get('n_gpu_procs', self.n_gpus_visible)

# total no. of processes allowed on all gpus
if self.n_gpu_procs > self.n_gpus_visible:
logger.info(
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!' % (
self.n_gpu_procs, self.n_gpus_visible))

# Instantiate different thread pools for non-daemon processes
logger.debug(
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
self.processors,
self.n_gpu_procs,
self.memory_gb,
self._cwd,
)
Expand Down Expand Up @@ -184,9 +197,12 @@ def _prerun_check(self, graph):
"""Check if any node exceeds the available resources"""
tasks_mem_gb = []
tasks_num_th = []
tasks_gpu_th = []
for node in graph.nodes():
tasks_mem_gb.append(node.mem_gb)
tasks_num_th.append(node.n_procs)
if node.is_gpu_node():
tasks_gpu_th.append(node.n_procs)

if np.any(np.array(tasks_mem_gb) > self.memory_gb):
logger.warning(
Expand All @@ -203,6 +219,12 @@ def _prerun_check(self, graph):
)
if self.raise_insufficient:
raise RuntimeError("Insufficient resources available for job")
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
logger.warning(
'Nodes demand more GPU than allowed (%d).',
self.n_gpu_procs)
if self.raise_insufficient:
raise RuntimeError('Insufficient GPU resources available for job')

def _postrun_check(self):
self.pool.shutdown()
Expand All @@ -213,11 +235,14 @@ def _check_resources(self, running_tasks):
"""
free_memory_gb = self.memory_gb
free_processors = self.processors
free_gpu_slots = self.n_gpu_procs
for _, jobid in running_tasks:
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
free_processors -= min(self.procs[jobid].n_procs, free_processors)
if self.procs[jobid].is_gpu_node():
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)

return free_memory_gb, free_processors
return free_memory_gb, free_processors, free_gpu_slots

def _send_procs_to_workers(self, updatehash=False, graph=None):
"""
Expand All @@ -232,7 +257,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
)

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks)

stats = (
len(self.pending_tasks),
Expand All @@ -241,6 +266,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs
)
if self._stats != stats:
tasks_list_msg = ""
Expand All @@ -256,13 +283,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
tasks_list_msg = indent(tasks_list_msg, " " * 21)
logger.info(
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
len(self.pending_tasks),
len(jobids),
free_memory_gb,
self.memory_gb,
free_processors,
self.processors,
free_gpu_slots,
self.n_gpu_procs,
tasks_list_msg,
)
self._stats = stats
Expand Down Expand Up @@ -304,28 +333,36 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
# Check requirements of this job
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
next_job_th = min(self.procs[jobid].n_procs, self.processors)
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)

is_gpu_node = self.procs[jobid].is_gpu_node()

# If node does not fit, skip at this moment
if next_job_th > free_processors or next_job_gb > free_memory_gb:
if (next_job_th > free_processors or next_job_gb > free_memory_gb
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)):
logger.debug(
"Cannot allocate job %d (%0.2fGB, %d threads).",
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
jobid,
next_job_gb,
next_job_th,
next_job_gpu_th,
)
continue

free_memory_gb -= next_job_gb
free_processors -= next_job_th
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be hit by your test, but coverage shows it's not. Can you look into this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed that because I never used updatedhash=True, but it seems that no test includes that. Should we add a test with that option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover that error does not impact "common" use (I have a project including this gpu support code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was looking into this I found two error about updatehash functionality. I sent a pull request #3709 to fix the biggest.
The second is that in multiproc plugin EVERY node will be executed in main thread if updatehash=True, so no multi process is enabled. I will try to send a pull request for that too (maybe after this gpu support is merged to avoid to handle merge conflicts)

logger.debug(
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
"%0.2fGB, %d threads.",
"%0.2fGB, %d threads, %d GPU slots.",
self.procs[jobid].fullname,
jobid,
next_job_gb,
next_job_th,
free_memory_gb,
free_processors,
free_gpu_slots,
)

# change job status in appropriate queues
Expand All @@ -352,6 +389,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self._remove_node_dirs()
free_memory_gb += next_job_gb
free_processors += next_job_th
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is releasing resource claims that were made around line 356 so the next time through the loop sees available resources.

Suggested change
if is_gpu_node:
free_gpu_slots -= next_job_gpu_th
if is_gpu_node:
free_gpu_slots += next_job_gpu_th

# Display stats next loop
self._stats = None

Expand Down Expand Up @@ -379,3 +418,12 @@ def _sort_jobs(self, jobids, scheduler="tsort"):
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs),
)
return jobids

@staticmethod
def gpu_count():
n_gpus = 1
try:
import GPUtil
return len(GPUtil.getGPUs())
except ImportError:
return n_gpus
15 changes: 15 additions & 0 deletions 15 nipype/pipeline/plugins/tests/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
class InputSpecSingleNode(nib.TraitedSpec):
input1 = nib.traits.Int(desc="a random int")
input2 = nib.traits.Int(desc="a random int")
use_gpu = nib.traits.Bool(False, mandatory = False, desc="boolean for GPU nodes")


class OutputSpecSingleNode(nib.TraitedSpec):
Expand Down Expand Up @@ -116,6 +117,20 @@ def test_no_more_threads_than_specified(tmpdir):
with pytest.raises(RuntimeError):
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})

def test_no_more_gpu_threads_than_specified(tmpdir):
tmpdir.chdir()

pipe = pe.Workflow(name="pipe")
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
n1.inputs.use_gpu = True
n1.inputs.input1 = 4
pipe.add_nodes([n1])

max_threads = 2
max_gpu = 1
with pytest.raises(RuntimeError):
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu})


@pytest.mark.skipif(
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.