From 751e0cc72501ae8b6368f150981c3701e3646fce Mon Sep 17 00:00:00 2001 From: schahid Date: Wed, 22 Nov 2017 13:52:28 +0100 Subject: [PATCH 1/4] mixed cpu/gpu scheduling in multiprocessing plugin --- nipype/pipeline/plugins/multiproc.py | 323 ++++++++++++++++++++++----- 1 file changed, 267 insertions(+), 56 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 595b0e1947..0f1ca03246 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -24,9 +24,14 @@ # Init logger logger = logging.getLogger('workflow') +# GPU stuff +import GPUtil +import json +import os + # Run node -def run_node(node, updatehash, taskid): +def run_node(node, updatehash, taskid, devno=None): """Function to execute node.run(), catch and log any errors and return the result dictionary @@ -37,6 +42,8 @@ def run_node(node, updatehash, taskid): updatehash : boolean flag for updating hash + devno: the device id of the GPU to make it the only visible device before + submitting a job to it Returns ------- result : dictionary @@ -48,6 +55,11 @@ def run_node(node, updatehash, taskid): # Try and execute the node via node.run() try: + #set dev visible if not none + if devno is not None: + os.environ['CUDA_VISIBLE_DEVICES'] = str(devno) + logger.info('CUDA_VISIBLE_DEVICE=%d',devno) + result['result'] = node.run(updatehash=updatehash) except: result['traceback'] = format_exception(*sys.exc_info()) @@ -124,10 +136,51 @@ def __init__(self, plugin_args=None): self.memory_gb = self.plugin_args.get('memory_gb', # Allocate 90% of system memory get_system_total_memory_gb() * 0.9) self.raise_insufficient = self.plugin_args.get('raise_insufficient', True) - + + self.n_gpus_visible = self.gpu_count() + self.n_gpus = self.plugin_args.get('n_gpus', self.n_gpus_visible) + self.n_gpu_proc = self.plugin_args.get('ngpuproc', 1) + + # Check plugin args + if self.plugin_args: + if 'non_daemon' in self.plugin_args: + non_daemon = plugin_args['non_daemon'] + if 'n_procs' in self.plugin_args: + self.processors = self.plugin_args['n_procs'] + if 'memory_gb' in self.plugin_args: + self.memory_gb = self.plugin_args['memory_gb'] + if 'n_gpus' in self.plugin_args: + self.n_gpus = self.plugin_args['n_gpus'] + if 'ngpuproc' in self.plugin_args: + self.n_gpu_proc = self.plugin_args['ngpuproc'] + + + #total no. of processes allowed on all gpus + if self.n_gpus > self.n_gpus_visible: + logger.info('Total number of GPUs (%d) requested exceeds available number of GPUs (%d). Using all %d GPU(s).'%(self.n_gpus,self.n_gpus_visible, self.n_gpus_visible)) + self.n_gpus = self.n_gpus_visible + self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + else: + self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + + + gpus=GPUtil.getGPUs() + self.gpu_q={} + + + slotno=0 + for gpu in gpus: + temp={} + for ngp in range(self.n_gpu_proc): + slotno +=1 + temp.update({slotno:'free'}) + self.gpu_q.update({ gpu.id: temp }) + + # Instantiate different thread pools for non-daemon processes - logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', - 'non' if non_daemon else '', self.processors, self.memory_gb) + logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f', + ' ngpus=%d)', + 'non' if non_daemon else '', self.processors, self.memory_gb, self.n_gpus) NipypePool = NonDaemonPool if non_daemon else Pool try: @@ -138,6 +191,8 @@ def __init__(self, plugin_args=None): self._stats = None + + def _async_callback(self, args): self._taskresult[args['taskid']] = args @@ -146,8 +201,76 @@ def _get_result(self, taskid): def _clear_task(self, taskid): del self._task_obj[taskid] - - def _submit_job(self, node, updatehash=False): + + + def gpu_count(self): + return len(GPUtil.getGPUs()) + + def gpu_has_free_slot(self): + free=False + devno=None + slotno=None + for dk in self.gpu_q.keys(): + devno=dk + for sdk in self.gpu_q[dk].keys(): + if self.gpu_q[dk][sdk]=='free': + free=True + slotno=sdk + break + if free: + break + + return free,devno,slotno + + def set_gpu_slot_busy(self,slotno,jobid): + devno=None + for dk in self.gpu_q.keys(): + for sk in self.gpu_q[dk].keys(): + if sk==slotno: + devno=dk + self.gpu_q[dk][sk]= {'state':'busy','jobid':jobid} + return devno + + + def set_gpu_slot_free(self,jobid): + devno=None + for dk in self.gpu_q.keys(): + for sdk in self.gpu_q[dk].keys(): + if isinstance(self.gpu_q[dk][sdk],dict): + if self.gpu_q[dk][sdk]['jobid'] == jobid: + devno=dk + self.gpu_q[dk][sdk]='free' + return devno + + + #override, to set gpu slot free + def _task_finished_cb(self, jobid): + """ Extract outputs and assign to inputs of dependent tasks + + This is called when a job is completed. + """ + logger.info('[Job finished] Jobname: %s JobID: %d' % + (self.procs[jobid]._id, jobid)) + if self._status_callback: + self._status_callback(self.procs[jobid], 'end') + # Update job and worker queues + self.proc_pending[jobid] = False + # update the job dependency structure + rowview = self.depidx.getrowview(jobid) + rowview[rowview.nonzero()] = 0 + if jobid not in self.mapnodesubids: + self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 + + #update queue status + was_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + if was_gpu_job: + devid=self.set_gpu_slot_free(jobid) + if devid is not None: + logger.info('Device %d slot set free from jobid %d' % (devid, jobid) ) + + + def _submit_job(self, node, devno=None, updatehash=False): self._taskid += 1 # Don't allow streaming outputs @@ -155,10 +278,10 @@ def _submit_job(self, node, updatehash=False): node.interface.terminal_output = 'allatonce' self._task_obj[self._taskid] = self.pool.apply_async( - run_node, (node, updatehash, self._taskid), + run_node, (node, updatehash, self._taskid, devno), callback=self._async_callback) - logger.debug('MultiProc submitted task %s (taskid=%d).', + logger.info('MultiProc submitted task %s (taskid=%d).', node.fullname, self._taskid) return self._taskid @@ -166,9 +289,15 @@ def _prerun_check(self, graph): """Check if any node exeeds the available resources""" tasks_mem_gb = [] tasks_num_th = [] + tasks_gpu_th = [] + for node in graph.nodes(): tasks_mem_gb.append(node.mem_gb) tasks_num_th.append(node.n_procs) + is_gpu_job = (hasattr(node.interface.inputs, 'use_cuda') or \ + hasattr(node.interface.inputs, 'use_gpu')) + if is_gpu_job: + tasks_gpu_th.append(node.n_procs) if np.any(np.array(tasks_mem_gb) > self.memory_gb): logger.warning( @@ -183,6 +312,13 @@ def _prerun_check(self, graph): self.processors) if self.raise_insufficient: raise RuntimeError('Insufficient resources available for job') + + if np.any(np.array(tasks_gpu_th) > self.total_gpu_processors): + logger.warning( + 'Nodes demand more processes than allowed (%d).', + self.total_gpu_processors) + if self.raise_insufficient: + raise RuntimeError('Insufficient GPU resources available for job') def _postrun_check(self): self.pool.close() @@ -193,11 +329,19 @@ def _check_resources(self, running_tasks): """ free_memory_gb = self.memory_gb free_processors = self.processors + free_gpu_slots = self.total_gpu_processors + + for _, jobid in running_tasks: + is_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + if is_gpu_job: + free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots) + free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb) free_processors -= min(self.procs[jobid].n_procs, free_processors) - return free_memory_gb, free_processors + return free_memory_gb, free_processors, free_gpu_slots def _send_procs_to_workers(self, updatehash=False, graph=None): """ @@ -209,13 +353,14 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1] # Check available system resources by summing all threads and memory used - free_memory_gb, free_processors = self._check_resources(self.pending_tasks) + free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks) stats = (len(self.pending_tasks), len(jobids), free_memory_gb, - self.memory_gb, free_processors, self.processors) + self.memory_gb, free_processors, self.processors, free_gpu_slots, self.total_gpu_processors) + if self._stats != stats: logger.info('Currently running %d tasks, and %d jobs ready. Free ' - 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d', + 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d Free GPU slots %d/%d', *stats) self._stats = stats @@ -246,62 +391,128 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): submit = self._submit_mapnode(jobid) if not submit: continue + + is_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + # Check requirements of this job next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb) next_job_th = min(self.procs[jobid].n_procs, self.processors) - - # If node does not fit, skip at this moment - if next_job_th > free_processors or next_job_gb > free_memory_gb: - logger.debug('Cannot allocate job %d (%0.2fGB, %d threads).', - jobid, next_job_gb, next_job_th) + next_job_gpu_th = min(self.procs[jobid].n_procs, self.total_gpu_processors) + + if is_gpu_job and next_job_gpu_th > free_gpu_slots: + + logger.debug('Cannot allocate job %d on GPU (%d slots).', + jobid, next_job_gpu_th) + continue + + elif not is_gpu_job and next_job_th > free_processors or next_job_gb > free_memory_gb: + # If node does not fit, skip at this moment + logger.debug('Cannot allocate job %d (%0.2fGB, %d threads, %d slots).', + jobid, next_job_gb, next_job_th, next_job_gpu_th) continue free_memory_gb -= next_job_gb free_processors -= next_job_th - logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.', + + if is_gpu_job: + + free_gpu_slots -= next_job_gpu_th + + is_gpu_free,devno,slotno = self.gpu_has_free_slot() + + if devno is not None and slotno is not None: + self.set_gpu_slot_busy(slotno,jobid) + logger.info('[*** GPU ID: %d Running: %s Job-ID: %d, Slot-ID:%d, Queue State:%s' % + (devno, self.procs[jobid]._id, jobid, slotno, json.dumps(self.gpu_q))) + + # change job status in appropriate queues + self.proc_done[jobid] = True + self.proc_pending[jobid] = True + + # If cached just retrieve it, don't run + if self._local_hash_check(jobid, graph): + continue + + if self.procs[jobid].run_without_submitting: + logger.debug('Running node %s on master thread', self.procs[jobid]) + try: + self.procs[jobid].run() + except Exception: + traceback = format_exception(*sys.exc_info()) + self._report_crash(self.procs[jobid], traceback=traceback) + + # Release resources + self._task_finished_cb(jobid) + self._remove_node_dirs() + free_memory_gb += next_job_gb + free_processors += next_job_th + free_gpu_slots += next_job_gpu_th + + # Display stats next loop + self._stats = None + continue + + # Task should be submitted to workers + # Send job to task manager and add to pending tasks + if self._status_callback: + self._status_callback(self.procs[jobid], 'start') + tid = self._submit_job(deepcopy(self.procs[jobid]),devno, + updatehash=updatehash) + + if tid is None: + self.proc_done[jobid] = False + self.proc_pending[jobid] = False + else: + self.pending_tasks.insert(0, (tid, jobid)) + # Display stats next loop + self._stats = None + else: + logger.info('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.', self.procs[jobid].fullname, jobid, next_job_gb, next_job_th, free_memory_gb, free_processors) - - # change job status in appropriate queues - self.proc_done[jobid] = True - self.proc_pending[jobid] = True - - # If cached just retrieve it, don't run - if self._local_hash_check(jobid, graph): - continue - - if self.procs[jobid].run_without_submitting: - logger.debug('Running node %s on master thread', - self.procs[jobid]) - try: - self.procs[jobid].run() - except Exception: - traceback = format_exception(*sys.exc_info()) - self._report_crash(self.procs[jobid], traceback=traceback) - - # Release resources - self._task_finished_cb(jobid) - self._remove_node_dirs() - free_memory_gb += next_job_gb - free_processors += next_job_th + # change job status in appropriate queues + self.proc_done[jobid] = True + self.proc_pending[jobid] = True + + # If cached just retrieve it, don't run + if self._local_hash_check(jobid, graph): + continue + + if self.procs[jobid].run_without_submitting: + logger.debug('Running node %s on master thread', self.procs[jobid]) + try: + self.procs[jobid].run() + except Exception: + traceback = format_exception(*sys.exc_info()) + self._report_crash(self.procs[jobid], traceback=traceback) + + # Release resources + self._task_finished_cb(jobid) + self._remove_node_dirs() + free_memory_gb += next_job_gb + free_processors += next_job_th + free_gpu_slots += next_job_gpu_th + + # Display stats next loop + self._stats = None + continue + + # Task should be submitted to workers + # Send job to task manager and add to pending tasks + if self._status_callback: + self._status_callback(self.procs[jobid], 'start') + tid = self._submit_job(deepcopy(self.procs[jobid]),None, + updatehash=updatehash) + + if tid is None: + self.proc_done[jobid] = False + self.proc_pending[jobid] = False + else: + self.pending_tasks.insert(0, (tid, jobid)) # Display stats next loop self._stats = None - continue - - # Task should be submitted to workers - # Send job to task manager and add to pending tasks - if self._status_callback: - self._status_callback(self.procs[jobid], 'start') - tid = self._submit_job(deepcopy(self.procs[jobid]), - updatehash=updatehash) - if tid is None: - self.proc_done[jobid] = False - self.proc_pending[jobid] = False - else: - self.pending_tasks.insert(0, (tid, jobid)) - # Display stats next loop - self._stats = None def _sort_jobs(self, jobids, scheduler='tsort'): if scheduler == 'mem_thread': From 8fc3799624cc5bac2895d2b5e8de3cb913642bf2 Mon Sep 17 00:00:00 2001 From: schahid Date: Wed, 22 Nov 2017 14:07:55 +0100 Subject: [PATCH 2/4] added some comments --- nipype/pipeline/plugins/multiproc.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 0f1ca03246..1e72c117b8 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -161,13 +161,15 @@ def __init__(self, plugin_args=None): self.n_gpus = self.n_gpus_visible self.total_gpu_processors = self.n_gpus * self.n_gpu_proc else: + #total gpu_processors = no.of GPUs * no.of threads per single GPU self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + #form a GPU queue first gpus=GPUtil.getGPUs() self.gpu_q={} - - + + #initialize the queue, set all slots free slotno=0 for gpu in gpus: temp={} @@ -207,6 +209,7 @@ def gpu_count(self): return len(GPUtil.getGPUs()) def gpu_has_free_slot(self): + #if a GPU has free slot, return True,its device-ID and the slot no. free=False devno=None slotno=None @@ -223,6 +226,7 @@ def gpu_has_free_slot(self): return free,devno,slotno def set_gpu_slot_busy(self,slotno,jobid): + #if a GPU has free slot, book it for a jobid,modify the queue and set its slotno busy devno=None for dk in self.gpu_q.keys(): for sk in self.gpu_q[dk].keys(): @@ -233,6 +237,7 @@ def set_gpu_slot_busy(self,slotno,jobid): def set_gpu_slot_free(self,jobid): + #if a GPU task is finished, then set the slotno free in the queue devno=None for dk in self.gpu_q.keys(): for sdk in self.gpu_q[dk].keys(): @@ -243,7 +248,7 @@ def set_gpu_slot_free(self,jobid): return devno - #override, to set gpu slot free + #override, to set gpu slot free, if the job was a gpu job def _task_finished_cb(self, jobid): """ Extract outputs and assign to inputs of dependent tasks @@ -281,7 +286,7 @@ def _submit_job(self, node, devno=None, updatehash=False): run_node, (node, updatehash, self._taskid, devno), callback=self._async_callback) - logger.info('MultiProc submitted task %s (taskid=%d).', + logger.debug('MultiProc submitted task %s (taskid=%d).', node.fullname, self._taskid) return self._taskid From 54490f9f822895050891eebc485cca4b3d187c1d Mon Sep 17 00:00:00 2001 From: schahid Date: Wed, 22 Nov 2017 15:25:58 +0100 Subject: [PATCH 3/4] try to use GPUtil only if available, otherwise use one GPU --- nipype/pipeline/plugins/multiproc.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 1e72c117b8..213cce9537 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -25,7 +25,6 @@ logger = logging.getLogger('workflow') # GPU stuff -import GPUtil import json import os @@ -166,17 +165,24 @@ def __init__(self, plugin_args=None): #form a GPU queue first - gpus=GPUtil.getGPUs() + gpus=[] + try: + import GPUtil + ngpus=GPUtil.getGPUs() + gpus=list(range(len(ngpus))) + except ImportError: + gpus=list(range(self.n_gpus)) + self.gpu_q={} #initialize the queue, set all slots free slotno=0 - for gpu in gpus: + for gpu in range(len(gpus)): temp={} for ngp in range(self.n_gpu_proc): slotno +=1 temp.update({slotno:'free'}) - self.gpu_q.update({ gpu.id: temp }) + self.gpu_q.update({ gpu: temp }) # Instantiate different thread pools for non-daemon processes @@ -206,7 +212,12 @@ def _clear_task(self, taskid): def gpu_count(self): - return len(GPUtil.getGPUs()) + ngpus=1 + try: + import GPUtil + return len(GPUtil.getGPUs()) + except ImportError: + return ngpus def gpu_has_free_slot(self): #if a GPU has free slot, return True,its device-ID and the slot no. From 3b23551bb57940aeda92714add1ac4b52c70c1cc Mon Sep 17 00:00:00 2001 From: schahid Date: Mon, 27 Nov 2017 10:22:40 +0100 Subject: [PATCH 4/4] fix for missing command error when calling ants binary --- nipype/interfaces/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index facafa5fc9..7e0e349902 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1570,10 +1570,12 @@ def __init__(self, command=None, terminal_output=None, **inputs): super(CommandLine, self).__init__(**inputs) self._environ = None # Set command. Input argument takes precedence - self._cmd = command or getattr(self, '_cmd', None) - - if self._cmd is None: + if not hasattr(self, '_cmd'): + self._cmd = None + if self.cmd is None and command is None: raise Exception("Missing command") + if command: + self._cmd = command if terminal_output is not None: self.terminal_output = terminal_output