Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Nipype multiproc plugin modification to use GPU(s) as resources. #2298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Prev Previous commit
allow to control single gpu processes using n_procs of the node runni…
…ng on gpu.
  • Loading branch information
schahid committed Mar 21, 2019
commit 1e1adc8ec03f76718638e81a2d149d4a9bacfe8d
97 changes: 83 additions & 14 deletions 97 nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def run_node(node, updatehash, taskid, devno=None):

result['result'] = node.run(updatehash=updatehash)
except:
if devno is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = str(devno)
logger.info('EXC: CUDA_VISIBLE_DEVICE=%d',devno)

result['traceback'] = format_exception(*sys.exc_info())
result['result'] = node.result

Expand Down Expand Up @@ -156,8 +160,8 @@ def __init__(self, plugin_args=None):

#total no. of processes allowed on all gpus
if self.n_gpus > self.n_gpus_visible:
logger.info('Total number of GPUs (%d) requested exceeds available number of GPUs (%d). Using all %d GPU(s).'%(self.n_gpus,self.n_gpus_visible, self.n_gpus_visible))
self.n_gpus = self.n_gpus_visible
logger.info('Total number of GPUs (%d) requested exceeds the available number of GPUs (%d) on the system. Using requested %d GPU(s) (!!!at your own risk!!!).'%(self.n_gpus,self.n_gpus_visible, self.n_gpus))
#self.n_gpus = self.n_gpus_visible
self.total_gpu_processors = self.n_gpus * self.n_gpu_proc
else:
#total gpu_processors = no.of GPUs * no.of threads per single GPU
Expand Down Expand Up @@ -218,7 +222,27 @@ def gpu_count(self):
return len(GPUtil.getGPUs())
except ImportError:
return ngpus


def gpu_has_free_slots(self,nproc):
#if a single GPU has enough slots for nproc
free=False
devno=None
slotnos=None

for dk in self.gpu_q.keys():
devno=dk
slotnos=[]
for sdk in self.gpu_q[dk].keys():
if self.gpu_q[dk][sdk]=='free':
slotnos.append(sdk)
if len(slotnos) == nproc:
free=True
break
if free:
break

return free,devno,slotnos

def gpu_has_free_slot(self):
#if a GPU has free slot, return True,its device-ID and the slot no.
free=False
Expand All @@ -235,7 +259,20 @@ def gpu_has_free_slot(self):
break

return free,devno,slotno


def set_gpu_slots_busy(self,slotnos,jobid):
#if a GPU has free slots, book all for that single jobid
devno=None

for dk in self.gpu_q.keys():
for sk in self.gpu_q[dk].keys():
for slotno in slotnos:
if sk==slotno:
devno=dk
self.gpu_q[dk][sk]= {'state':'busy','jobid':jobid}
return devno

def set_gpu_slot_busy(self,slotno,jobid):
#if a GPU has free slot, book it for a jobid,modify the queue and set its slotno busy
devno=None
Expand All @@ -258,6 +295,17 @@ def set_gpu_slot_free(self,jobid):
self.gpu_q[dk][sdk]='free'
return devno

def set_gpu_slots_free(self,jobid):
#if a GPU task is finished, then set the slots free in the queue
devno=None
for dk in self.gpu_q.keys():
for sdk in self.gpu_q[dk].keys():
if isinstance(self.gpu_q[dk][sdk],dict):
if self.gpu_q[dk][sdk]['jobid'] == jobid:
devno=dk
self.gpu_q[dk][sdk]='free'
return devno


#override, to set gpu slot free, if the job was a gpu job
def _task_finished_cb(self, jobid, cached=False):
Expand All @@ -282,9 +330,14 @@ def _task_finished_cb(self, jobid, cached=False):
was_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \
hasattr(self.procs[jobid]._interface.inputs, 'use_gpu'))
if was_gpu_job:
devid=self.set_gpu_slot_free(jobid)
if devid is not None:
logger.info('Device %d slot set free from jobid %d' % (devid, jobid) )
if self.procs[jobid].n_procs > 1:
devid=self.set_gpu_slots_free(jobid)
if devid is not None:
logger.info('GPU Device no %d slots set free from jobid %d' % (devid,jobid) )
else:
devid=self.set_gpu_slot_free(jobid)
if devid is not None:
logger.info('GPU Device no %d slot set free from jobid %d' % (devid, jobid) )


def _submit_job(self, node, devno=None, updatehash=False):
Expand Down Expand Up @@ -386,7 +439,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
return

if len(jobids) + len(self.pending_tasks) == 0:
logger.debug('No tasks are being run, and no jobs can '
logger.info('**** ATTENTION ****: No tasks are being run, and no jobs can '
'be submitted to the queue. Potential deadlock')
return

Expand All @@ -398,6 +451,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
if isinstance(self.procs[jobid], MapNode):
try:
num_subnodes = self.procs[jobid].num_subnodes()
logger.info('\n***ATTENTION:%d is a MapNode with %d sub-nodes:%d' % (jobid,num_subnodes))
except Exception:
traceback = format_exception(*sys.exc_info())
self._report_crash(self.procs[jobid], traceback=traceback)
Expand All @@ -417,7 +471,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
next_job_th = min(self.procs[jobid].n_procs, self.processors)
next_job_gpu_th = min(self.procs[jobid].n_procs, self.total_gpu_processors)


is_gpu_free,devno,slotnos = self.gpu_has_free_slots(next_job_gpu_th)

if is_gpu_job and next_job_gpu_th > len(slotnos):
logger.debug('Can not allocate slots, insufficient slots for this job %d', jobid)
continue

if is_gpu_job and next_job_gpu_th > free_gpu_slots:

logger.debug('Cannot allocate job %d on GPU (%d slots).',
Expand All @@ -436,12 +496,21 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
if is_gpu_job:

free_gpu_slots -= next_job_gpu_th

is_gpu_free,devno,slotno = self.gpu_has_free_slot()

if devno is not None and slotno is not None:
self.set_gpu_slot_busy(slotno,jobid)
logger.info('[*** GPU ID: %d Running: %s Job-ID: %d, Slot-ID:%d, Queue State:%s' %
if next_job_gpu_th > 1:
is_gpu_free,devno,slotnos = self.gpu_has_free_slots(next_job_gpu_th)
if is_gpu_free and next_job_gpu_th <= len(slotnos):
self.set_gpu_slots_busy(slotnos,jobid)
logger.info('[*** GPU ID: %d Running Job: %s Job-ID: %d on multiple slots:%s' %
(devno, self.procs[jobid]._id, jobid, slotnos))
logger.debug('[*** GPU ID: %d Running Job: %s Job-ID: %d on multiple slots:%s, Queue State:%s' %
(devno, self.procs[jobid]._id, jobid, slotnos, json.dumps(self.gpu_q)))
else:
is_gpu_free,devno,slotno = self.gpu_has_free_slot()
if is_gpu_free and slotno is not None:
self.set_gpu_slot_busy(slotno,jobid)
logger.info('[*** GPU ID: %d Running Job: %s Job-ID: %d on single slot-ID:%d' %
(devno, self.procs[jobid]._id, jobid, slotno))
logger.debug('[*** GPU ID: %d Running Job: %s Job-ID: %d on single slot-ID:%d, Queue State:%s' %
(devno, self.procs[jobid]._id, jobid, slotno, json.dumps(self.gpu_q)))

# change job status in appropriate queues
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.