diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index c54d268ff7..090b31ba4b 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -23,6 +23,7 @@ from ...utils.profiler import get_system_total_memory_gb from ..engine import MapNode from .base import DistributedPluginBase +import json try: from textwrap import indent @@ -41,7 +42,7 @@ def indent(text, prefix): # Run node -def run_node(node, updatehash, taskid): +def run_node(node, updatehash, taskid, devno=None): """Function to execute node.run(), catch and log any errors and return the result dictionary @@ -54,6 +55,8 @@ def run_node(node, updatehash, taskid): taskid : int an identifier for this task + devno: the device id of the GPU to make it the only visible device before + submitting a job to it Returns ------- result : dictionary @@ -65,6 +68,11 @@ def run_node(node, updatehash, taskid): # Try and execute the node via node.run() try: + # set dev visible if not none + if devno is not None: + os.environ['CUDA_VISIBLE_DEVICES'] = str(devno) + logger.info('CUDA_VISIBLE_DEVICE=%d', devno) + result['result'] = node.run(updatehash=updatehash) except: # noqa: E722, intendedly catch all here result['traceback'] = format_exception(*sys.exc_info()) @@ -148,10 +156,46 @@ def __init__(self, plugin_args=None): self.raise_insufficient = self.plugin_args.get('raise_insufficient', True) + self.n_gpus_visible = self.gpu_count() + self.n_gpus = self.plugin_args.get('n_gpus', self.n_gpus_visible) + self.n_gpu_proc = self.plugin_args.get('ngpuproc', 1) + + # total no. of processes allowed on all gpus + if self.n_gpus > self.n_gpus_visible: + logger.info('Total number of GPUs (%d) requested exceeds ' + 'available number of GPUs (%d). Using all %d GPU(s).', + self.n_gpus, self.n_gpus_visible, self.n_gpus_visible) + self.n_gpus = self.n_gpus_visible + self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + else: + # total gpu_processors = no.of GPUs * no.of threads per single GPU + self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + + # form a GPU queue first + gpus = [] + try: + import GPUtil + ngpus = GPUtil.getGPUs() + gpus = list(range(len(ngpus))) + except ImportError: + gpus = list(range(self.n_gpus)) + + self.gpu_q = {} + + # initialize the queue, set all slots free + slotno = 0 + for gpu in range(len(gpus)): + temp = {} + for ngp in range(self.n_gpu_proc): + slotno += 1 + temp.update({slotno: 'free'}) + self.gpu_q.update({gpu: temp}) + # Instantiate different thread pools for non-daemon processes logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, ' - 'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon), - self.processors, self.memory_gb, self._cwd) + 'mem_gb=%0.2f, ngpus=%d, cwd=%s)', + 'non' * int(non_daemon), self.processors, self.memory_gb, + self.n_gpus, self._cwd) NipypePool = NonDaemonPool if non_daemon else Pool try: @@ -180,7 +224,83 @@ def _get_result(self, taskid): def _clear_task(self, taskid): del self._task_obj[taskid] - def _submit_job(self, node, updatehash=False): + def gpu_count(self): + ngpus = 1 + try: + import GPUtil + ngpus = len(GPUtil.getGPUs()) + except ImportError: + pass + return ngpus + + def gpu_has_free_slot(self): + # if a GPU has free slot, return True,its device-ID and the slot no. + free = False + devno = None + slotno = None + for dk in self.gpu_q.keys(): + devno = dk + for sdk in self.gpu_q[dk].keys(): + if self.gpu_q[dk][sdk] == 'free': + free = True + slotno = sdk + break + if free: + break + + return free, devno, slotno + + def set_gpu_slot_busy(self, slotno, jobid): + # if a GPU has free slot, book it for a jobid,modify the queue and set + # its slotno busy + devno = None + for dk in self.gpu_q.keys(): + for sk in self.gpu_q[dk].keys(): + if sk == slotno: + devno = dk + self.gpu_q[dk][sk] = {'state': 'busy', 'jobid': jobid} + return devno + + def set_gpu_slot_free(self, jobid): + # if a GPU task is finished, then set the slotno free in the queue + devno = None + for dk in self.gpu_q.keys(): + for sdk in self.gpu_q[dk].keys(): + if isinstance(self.gpu_q[dk][sdk], dict): + if self.gpu_q[dk][sdk]['jobid'] == jobid: + devno = dk + self.gpu_q[dk][sdk] = 'free' + return devno + + # override, to set gpu slot free, if the job was a gpu job + def _task_finished_cb(self, jobid): + """ Extract outputs and assign to inputs of dependent tasks + + This is called when a job is completed. + """ + logger.info('[Job finished] Jobname: %s JobID: %d' % + (self.procs[jobid]._id, jobid)) + if self._status_callback: + self._status_callback(self.procs[jobid], 'end') + # Update job and worker queues + self.proc_pending[jobid] = False + # update the job dependency structure + rowview = self.depidx.getrowview(jobid) + rowview[rowview.nonzero()] = 0 + if jobid not in self.mapnodesubids: + self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 + + # update queue status + was_gpu_job = ( + hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + if was_gpu_job: + devid = self.set_gpu_slot_free(jobid) + if devid is not None: + logger.info('Device %d slot set free from jobid %d', devid, + jobid) + + def _submit_job(self, node, devno=None, updatehash=False): self._taskid += 1 # Don't allow streaming outputs @@ -188,7 +308,7 @@ def _submit_job(self, node, updatehash=False): node.interface.terminal_output = 'allatonce' self._task_obj[self._taskid] = self.pool.apply_async( - run_node, (node, updatehash, self._taskid), + run_node, (node, updatehash, self._taskid, devno), callback=self._async_callback) logger.debug('[MultiProc] Submitted task %s (taskid=%d).', @@ -199,9 +319,15 @@ def _prerun_check(self, graph): """Check if any node exeeds the available resources""" tasks_mem_gb = [] tasks_num_th = [] + tasks_gpu_th = [] + for node in graph.nodes(): tasks_mem_gb.append(node.mem_gb) tasks_num_th.append(node.n_procs) + is_gpu_job = (hasattr(node.interface.inputs, 'use_cuda') or + hasattr(node.interface.inputs, 'use_gpu')) + if is_gpu_job: + tasks_gpu_th.append(node.n_procs) if np.any(np.array(tasks_mem_gb) > self.memory_gb): logger.warning( @@ -217,6 +343,14 @@ def _prerun_check(self, graph): if self.raise_insufficient: raise RuntimeError('Insufficient resources available for job') + if np.any(np.array(tasks_gpu_th) > self.total_gpu_processors): + logger.warning( + 'Nodes demand more processes than allowed (%d).', + self.total_gpu_processors) + if self.raise_insufficient: + raise RuntimeError('Insufficient GPU resources available for ' + 'job') + def _postrun_check(self): self.pool.close() @@ -226,11 +360,20 @@ def _check_resources(self, running_tasks): """ free_memory_gb = self.memory_gb free_processors = self.processors + free_gpu_slots = self.total_gpu_processors + for _, jobid in running_tasks: + is_gpu_job = ( + hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + if is_gpu_job: + free_gpu_slots -= min(self.procs[jobid].n_procs, + free_gpu_slots) + free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb) free_processors -= min(self.procs[jobid].n_procs, free_processors) - return free_memory_gb, free_processors + return free_memory_gb, free_processors, free_gpu_slots def _send_procs_to_workers(self, updatehash=False, graph=None): """ @@ -244,11 +387,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): (self.depidx.sum(axis=0) == 0).__array__()) # Check available resources by summing all threads and memory used - free_memory_gb, free_processors = self._check_resources( - self.pending_tasks) + free_memory_gb, free_processors, free_gpu_slots = \ + self._check_resources(self.pending_tasks) stats = (len(self.pending_tasks), len(jobids), free_memory_gb, - self.memory_gb, free_processors, self.processors) + self.memory_gb, free_processors, self.processors, + free_gpu_slots, self.total_gpu_processors) + if self._stats != stats: tasks_list_msg = '' @@ -263,9 +408,11 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): tasks_list_msg = indent(tasks_list_msg, ' ' * 21) logger.info( '[MultiProc] Running %d tasks, and %d jobs ready. Free ' - 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s', + 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free ' + 'GPUs: %d/%d.%s', len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb, free_processors, self.processors, + free_gpu_slots, self.total_gpu_processors, tasks_list_msg) self._stats = stats @@ -306,23 +453,50 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): if not submit: continue + is_gpu_job = ( + hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + # Check requirements of this job next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb) - next_job_th = min(self.procs[jobid].n_procs, self.processors) + if is_gpu_job: + next_job_gpu_th = min(self.procs[jobid].n_procs, + self.total_gpu_processors) + else: + next_job_th = min(self.procs[jobid].n_procs, self.processors) + + if is_gpu_job and next_job_gpu_th > free_gpu_slots: - # If node does not fit, skip at this moment - if next_job_th > free_processors or next_job_gb > free_memory_gb: - logger.debug('Cannot allocate job %d (%0.2fGB, %d threads).', - jobid, next_job_gb, next_job_th) + logger.debug('Cannot allocate job %d on GPU (%d slots).', + jobid, next_job_gpu_th) + continue + + elif (not is_gpu_job and next_job_th > free_processors or + next_job_gb > free_memory_gb): + # If node does not fit, skip at this moment + logger.debug('Cannot allocate job %d (%0.2fGB, %d threads, ' + '%d slots).', jobid, next_job_gb, next_job_th, + next_job_gpu_th) continue free_memory_gb -= next_job_gb free_processors -= next_job_th + free_gpu_slots -= next_job_gpu_th + logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: ' '%0.2fGB, %d threads.', self.procs[jobid].fullname, jobid, next_job_gb, next_job_th, free_memory_gb, free_processors) + if is_gpu_job: + is_gpu_free, devno, slotno = self.gpu_has_free_slot() + if devno is not None and slotno is not None: + self.set_gpu_slot_busy(slotno, jobid) + logger.info( + '[*** GPU ID: %d Running: %s Job-ID: %d, Slot-ID:%d, ' + 'Queue State:%s', devno, self.procs[jobid]._id, jobid, + slotno, json.dumps(self.gpu_q)) + # change job status in appropriate queues self.proc_done[jobid] = True self.proc_pending[jobid] = True @@ -377,6 +551,7 @@ def _sort_jobs(self, jobids, scheduler='tsort'): if scheduler == 'mem_thread': return sorted( jobids, - key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs) + key=lambda item: (self.procs[item].mem_gb, + self.procs[item].n_procs) ) return jobids