From 751e0cc72501ae8b6368f150981c3701e3646fce Mon Sep 17 00:00:00 2001 From: schahid Date: Wed, 22 Nov 2017 13:52:28 +0100 Subject: [PATCH 1/7] mixed cpu/gpu scheduling in multiprocessing plugin --- nipype/pipeline/plugins/multiproc.py | 323 ++++++++++++++++++++++----- 1 file changed, 267 insertions(+), 56 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 595b0e1947..0f1ca03246 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -24,9 +24,14 @@ # Init logger logger = logging.getLogger('workflow') +# GPU stuff +import GPUtil +import json +import os + # Run node -def run_node(node, updatehash, taskid): +def run_node(node, updatehash, taskid, devno=None): """Function to execute node.run(), catch and log any errors and return the result dictionary @@ -37,6 +42,8 @@ def run_node(node, updatehash, taskid): updatehash : boolean flag for updating hash + devno: the device id of the GPU to make it the only visible device before + submitting a job to it Returns ------- result : dictionary @@ -48,6 +55,11 @@ def run_node(node, updatehash, taskid): # Try and execute the node via node.run() try: + #set dev visible if not none + if devno is not None: + os.environ['CUDA_VISIBLE_DEVICES'] = str(devno) + logger.info('CUDA_VISIBLE_DEVICE=%d',devno) + result['result'] = node.run(updatehash=updatehash) except: result['traceback'] = format_exception(*sys.exc_info()) @@ -124,10 +136,51 @@ def __init__(self, plugin_args=None): self.memory_gb = self.plugin_args.get('memory_gb', # Allocate 90% of system memory get_system_total_memory_gb() * 0.9) self.raise_insufficient = self.plugin_args.get('raise_insufficient', True) - + + self.n_gpus_visible = self.gpu_count() + self.n_gpus = self.plugin_args.get('n_gpus', self.n_gpus_visible) + self.n_gpu_proc = self.plugin_args.get('ngpuproc', 1) + + # Check plugin args + if self.plugin_args: + if 'non_daemon' in self.plugin_args: + non_daemon = plugin_args['non_daemon'] + if 'n_procs' in self.plugin_args: + self.processors = self.plugin_args['n_procs'] + if 'memory_gb' in self.plugin_args: + self.memory_gb = self.plugin_args['memory_gb'] + if 'n_gpus' in self.plugin_args: + self.n_gpus = self.plugin_args['n_gpus'] + if 'ngpuproc' in self.plugin_args: + self.n_gpu_proc = self.plugin_args['ngpuproc'] + + + #total no. of processes allowed on all gpus + if self.n_gpus > self.n_gpus_visible: + logger.info('Total number of GPUs (%d) requested exceeds available number of GPUs (%d). Using all %d GPU(s).'%(self.n_gpus,self.n_gpus_visible, self.n_gpus_visible)) + self.n_gpus = self.n_gpus_visible + self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + else: + self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + + + gpus=GPUtil.getGPUs() + self.gpu_q={} + + + slotno=0 + for gpu in gpus: + temp={} + for ngp in range(self.n_gpu_proc): + slotno +=1 + temp.update({slotno:'free'}) + self.gpu_q.update({ gpu.id: temp }) + + # Instantiate different thread pools for non-daemon processes - logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', - 'non' if non_daemon else '', self.processors, self.memory_gb) + logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f', + ' ngpus=%d)', + 'non' if non_daemon else '', self.processors, self.memory_gb, self.n_gpus) NipypePool = NonDaemonPool if non_daemon else Pool try: @@ -138,6 +191,8 @@ def __init__(self, plugin_args=None): self._stats = None + + def _async_callback(self, args): self._taskresult[args['taskid']] = args @@ -146,8 +201,76 @@ def _get_result(self, taskid): def _clear_task(self, taskid): del self._task_obj[taskid] - - def _submit_job(self, node, updatehash=False): + + + def gpu_count(self): + return len(GPUtil.getGPUs()) + + def gpu_has_free_slot(self): + free=False + devno=None + slotno=None + for dk in self.gpu_q.keys(): + devno=dk + for sdk in self.gpu_q[dk].keys(): + if self.gpu_q[dk][sdk]=='free': + free=True + slotno=sdk + break + if free: + break + + return free,devno,slotno + + def set_gpu_slot_busy(self,slotno,jobid): + devno=None + for dk in self.gpu_q.keys(): + for sk in self.gpu_q[dk].keys(): + if sk==slotno: + devno=dk + self.gpu_q[dk][sk]= {'state':'busy','jobid':jobid} + return devno + + + def set_gpu_slot_free(self,jobid): + devno=None + for dk in self.gpu_q.keys(): + for sdk in self.gpu_q[dk].keys(): + if isinstance(self.gpu_q[dk][sdk],dict): + if self.gpu_q[dk][sdk]['jobid'] == jobid: + devno=dk + self.gpu_q[dk][sdk]='free' + return devno + + + #override, to set gpu slot free + def _task_finished_cb(self, jobid): + """ Extract outputs and assign to inputs of dependent tasks + + This is called when a job is completed. + """ + logger.info('[Job finished] Jobname: %s JobID: %d' % + (self.procs[jobid]._id, jobid)) + if self._status_callback: + self._status_callback(self.procs[jobid], 'end') + # Update job and worker queues + self.proc_pending[jobid] = False + # update the job dependency structure + rowview = self.depidx.getrowview(jobid) + rowview[rowview.nonzero()] = 0 + if jobid not in self.mapnodesubids: + self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 + + #update queue status + was_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + if was_gpu_job: + devid=self.set_gpu_slot_free(jobid) + if devid is not None: + logger.info('Device %d slot set free from jobid %d' % (devid, jobid) ) + + + def _submit_job(self, node, devno=None, updatehash=False): self._taskid += 1 # Don't allow streaming outputs @@ -155,10 +278,10 @@ def _submit_job(self, node, updatehash=False): node.interface.terminal_output = 'allatonce' self._task_obj[self._taskid] = self.pool.apply_async( - run_node, (node, updatehash, self._taskid), + run_node, (node, updatehash, self._taskid, devno), callback=self._async_callback) - logger.debug('MultiProc submitted task %s (taskid=%d).', + logger.info('MultiProc submitted task %s (taskid=%d).', node.fullname, self._taskid) return self._taskid @@ -166,9 +289,15 @@ def _prerun_check(self, graph): """Check if any node exeeds the available resources""" tasks_mem_gb = [] tasks_num_th = [] + tasks_gpu_th = [] + for node in graph.nodes(): tasks_mem_gb.append(node.mem_gb) tasks_num_th.append(node.n_procs) + is_gpu_job = (hasattr(node.interface.inputs, 'use_cuda') or \ + hasattr(node.interface.inputs, 'use_gpu')) + if is_gpu_job: + tasks_gpu_th.append(node.n_procs) if np.any(np.array(tasks_mem_gb) > self.memory_gb): logger.warning( @@ -183,6 +312,13 @@ def _prerun_check(self, graph): self.processors) if self.raise_insufficient: raise RuntimeError('Insufficient resources available for job') + + if np.any(np.array(tasks_gpu_th) > self.total_gpu_processors): + logger.warning( + 'Nodes demand more processes than allowed (%d).', + self.total_gpu_processors) + if self.raise_insufficient: + raise RuntimeError('Insufficient GPU resources available for job') def _postrun_check(self): self.pool.close() @@ -193,11 +329,19 @@ def _check_resources(self, running_tasks): """ free_memory_gb = self.memory_gb free_processors = self.processors + free_gpu_slots = self.total_gpu_processors + + for _, jobid in running_tasks: + is_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + if is_gpu_job: + free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots) + free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb) free_processors -= min(self.procs[jobid].n_procs, free_processors) - return free_memory_gb, free_processors + return free_memory_gb, free_processors, free_gpu_slots def _send_procs_to_workers(self, updatehash=False, graph=None): """ @@ -209,13 +353,14 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1] # Check available system resources by summing all threads and memory used - free_memory_gb, free_processors = self._check_resources(self.pending_tasks) + free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks) stats = (len(self.pending_tasks), len(jobids), free_memory_gb, - self.memory_gb, free_processors, self.processors) + self.memory_gb, free_processors, self.processors, free_gpu_slots, self.total_gpu_processors) + if self._stats != stats: logger.info('Currently running %d tasks, and %d jobs ready. Free ' - 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d', + 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d Free GPU slots %d/%d', *stats) self._stats = stats @@ -246,62 +391,128 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): submit = self._submit_mapnode(jobid) if not submit: continue + + is_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + # Check requirements of this job next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb) next_job_th = min(self.procs[jobid].n_procs, self.processors) - - # If node does not fit, skip at this moment - if next_job_th > free_processors or next_job_gb > free_memory_gb: - logger.debug('Cannot allocate job %d (%0.2fGB, %d threads).', - jobid, next_job_gb, next_job_th) + next_job_gpu_th = min(self.procs[jobid].n_procs, self.total_gpu_processors) + + if is_gpu_job and next_job_gpu_th > free_gpu_slots: + + logger.debug('Cannot allocate job %d on GPU (%d slots).', + jobid, next_job_gpu_th) + continue + + elif not is_gpu_job and next_job_th > free_processors or next_job_gb > free_memory_gb: + # If node does not fit, skip at this moment + logger.debug('Cannot allocate job %d (%0.2fGB, %d threads, %d slots).', + jobid, next_job_gb, next_job_th, next_job_gpu_th) continue free_memory_gb -= next_job_gb free_processors -= next_job_th - logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.', + + if is_gpu_job: + + free_gpu_slots -= next_job_gpu_th + + is_gpu_free,devno,slotno = self.gpu_has_free_slot() + + if devno is not None and slotno is not None: + self.set_gpu_slot_busy(slotno,jobid) + logger.info('[*** GPU ID: %d Running: %s Job-ID: %d, Slot-ID:%d, Queue State:%s' % + (devno, self.procs[jobid]._id, jobid, slotno, json.dumps(self.gpu_q))) + + # change job status in appropriate queues + self.proc_done[jobid] = True + self.proc_pending[jobid] = True + + # If cached just retrieve it, don't run + if self._local_hash_check(jobid, graph): + continue + + if self.procs[jobid].run_without_submitting: + logger.debug('Running node %s on master thread', self.procs[jobid]) + try: + self.procs[jobid].run() + except Exception: + traceback = format_exception(*sys.exc_info()) + self._report_crash(self.procs[jobid], traceback=traceback) + + # Release resources + self._task_finished_cb(jobid) + self._remove_node_dirs() + free_memory_gb += next_job_gb + free_processors += next_job_th + free_gpu_slots += next_job_gpu_th + + # Display stats next loop + self._stats = None + continue + + # Task should be submitted to workers + # Send job to task manager and add to pending tasks + if self._status_callback: + self._status_callback(self.procs[jobid], 'start') + tid = self._submit_job(deepcopy(self.procs[jobid]),devno, + updatehash=updatehash) + + if tid is None: + self.proc_done[jobid] = False + self.proc_pending[jobid] = False + else: + self.pending_tasks.insert(0, (tid, jobid)) + # Display stats next loop + self._stats = None + else: + logger.info('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.', self.procs[jobid].fullname, jobid, next_job_gb, next_job_th, free_memory_gb, free_processors) - - # change job status in appropriate queues - self.proc_done[jobid] = True - self.proc_pending[jobid] = True - - # If cached just retrieve it, don't run - if self._local_hash_check(jobid, graph): - continue - - if self.procs[jobid].run_without_submitting: - logger.debug('Running node %s on master thread', - self.procs[jobid]) - try: - self.procs[jobid].run() - except Exception: - traceback = format_exception(*sys.exc_info()) - self._report_crash(self.procs[jobid], traceback=traceback) - - # Release resources - self._task_finished_cb(jobid) - self._remove_node_dirs() - free_memory_gb += next_job_gb - free_processors += next_job_th + # change job status in appropriate queues + self.proc_done[jobid] = True + self.proc_pending[jobid] = True + + # If cached just retrieve it, don't run + if self._local_hash_check(jobid, graph): + continue + + if self.procs[jobid].run_without_submitting: + logger.debug('Running node %s on master thread', self.procs[jobid]) + try: + self.procs[jobid].run() + except Exception: + traceback = format_exception(*sys.exc_info()) + self._report_crash(self.procs[jobid], traceback=traceback) + + # Release resources + self._task_finished_cb(jobid) + self._remove_node_dirs() + free_memory_gb += next_job_gb + free_processors += next_job_th + free_gpu_slots += next_job_gpu_th + + # Display stats next loop + self._stats = None + continue + + # Task should be submitted to workers + # Send job to task manager and add to pending tasks + if self._status_callback: + self._status_callback(self.procs[jobid], 'start') + tid = self._submit_job(deepcopy(self.procs[jobid]),None, + updatehash=updatehash) + + if tid is None: + self.proc_done[jobid] = False + self.proc_pending[jobid] = False + else: + self.pending_tasks.insert(0, (tid, jobid)) # Display stats next loop self._stats = None - continue - - # Task should be submitted to workers - # Send job to task manager and add to pending tasks - if self._status_callback: - self._status_callback(self.procs[jobid], 'start') - tid = self._submit_job(deepcopy(self.procs[jobid]), - updatehash=updatehash) - if tid is None: - self.proc_done[jobid] = False - self.proc_pending[jobid] = False - else: - self.pending_tasks.insert(0, (tid, jobid)) - # Display stats next loop - self._stats = None def _sort_jobs(self, jobids, scheduler='tsort'): if scheduler == 'mem_thread': From 8fc3799624cc5bac2895d2b5e8de3cb913642bf2 Mon Sep 17 00:00:00 2001 From: schahid Date: Wed, 22 Nov 2017 14:07:55 +0100 Subject: [PATCH 2/7] added some comments --- nipype/pipeline/plugins/multiproc.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 0f1ca03246..1e72c117b8 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -161,13 +161,15 @@ def __init__(self, plugin_args=None): self.n_gpus = self.n_gpus_visible self.total_gpu_processors = self.n_gpus * self.n_gpu_proc else: + #total gpu_processors = no.of GPUs * no.of threads per single GPU self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + #form a GPU queue first gpus=GPUtil.getGPUs() self.gpu_q={} - - + + #initialize the queue, set all slots free slotno=0 for gpu in gpus: temp={} @@ -207,6 +209,7 @@ def gpu_count(self): return len(GPUtil.getGPUs()) def gpu_has_free_slot(self): + #if a GPU has free slot, return True,its device-ID and the slot no. free=False devno=None slotno=None @@ -223,6 +226,7 @@ def gpu_has_free_slot(self): return free,devno,slotno def set_gpu_slot_busy(self,slotno,jobid): + #if a GPU has free slot, book it for a jobid,modify the queue and set its slotno busy devno=None for dk in self.gpu_q.keys(): for sk in self.gpu_q[dk].keys(): @@ -233,6 +237,7 @@ def set_gpu_slot_busy(self,slotno,jobid): def set_gpu_slot_free(self,jobid): + #if a GPU task is finished, then set the slotno free in the queue devno=None for dk in self.gpu_q.keys(): for sdk in self.gpu_q[dk].keys(): @@ -243,7 +248,7 @@ def set_gpu_slot_free(self,jobid): return devno - #override, to set gpu slot free + #override, to set gpu slot free, if the job was a gpu job def _task_finished_cb(self, jobid): """ Extract outputs and assign to inputs of dependent tasks @@ -281,7 +286,7 @@ def _submit_job(self, node, devno=None, updatehash=False): run_node, (node, updatehash, self._taskid, devno), callback=self._async_callback) - logger.info('MultiProc submitted task %s (taskid=%d).', + logger.debug('MultiProc submitted task %s (taskid=%d).', node.fullname, self._taskid) return self._taskid From 54490f9f822895050891eebc485cca4b3d187c1d Mon Sep 17 00:00:00 2001 From: schahid Date: Wed, 22 Nov 2017 15:25:58 +0100 Subject: [PATCH 3/7] try to use GPUtil only if available, otherwise use one GPU --- nipype/pipeline/plugins/multiproc.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 1e72c117b8..213cce9537 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -25,7 +25,6 @@ logger = logging.getLogger('workflow') # GPU stuff -import GPUtil import json import os @@ -166,17 +165,24 @@ def __init__(self, plugin_args=None): #form a GPU queue first - gpus=GPUtil.getGPUs() + gpus=[] + try: + import GPUtil + ngpus=GPUtil.getGPUs() + gpus=list(range(len(ngpus))) + except ImportError: + gpus=list(range(self.n_gpus)) + self.gpu_q={} #initialize the queue, set all slots free slotno=0 - for gpu in gpus: + for gpu in range(len(gpus)): temp={} for ngp in range(self.n_gpu_proc): slotno +=1 temp.update({slotno:'free'}) - self.gpu_q.update({ gpu.id: temp }) + self.gpu_q.update({ gpu: temp }) # Instantiate different thread pools for non-daemon processes @@ -206,7 +212,12 @@ def _clear_task(self, taskid): def gpu_count(self): - return len(GPUtil.getGPUs()) + ngpus=1 + try: + import GPUtil + return len(GPUtil.getGPUs()) + except ImportError: + return ngpus def gpu_has_free_slot(self): #if a GPU has free slot, return True,its device-ID and the slot no. From 3b23551bb57940aeda92714add1ac4b52c70c1cc Mon Sep 17 00:00:00 2001 From: schahid Date: Mon, 27 Nov 2017 10:22:40 +0100 Subject: [PATCH 4/7] fix for missing command error when calling ants binary --- nipype/interfaces/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index facafa5fc9..7e0e349902 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1570,10 +1570,12 @@ def __init__(self, command=None, terminal_output=None, **inputs): super(CommandLine, self).__init__(**inputs) self._environ = None # Set command. Input argument takes precedence - self._cmd = command or getattr(self, '_cmd', None) - - if self._cmd is None: + if not hasattr(self, '_cmd'): + self._cmd = None + if self.cmd is None and command is None: raise Exception("Missing command") + if command: + self._cmd = command if terminal_output is not None: self.terminal_output = terminal_output From 93c97b3bc7a7615bc941a3ce27c7ee558bacf30e Mon Sep 17 00:00:00 2001 From: schahid Date: Fri, 25 May 2018 16:14:35 +0200 Subject: [PATCH 5/7] working copy of multiproc plugin --- nipype/pipeline/plugins/multiproc.py | 468 +++++++++++++-------------- 1 file changed, 224 insertions(+), 244 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 090b31ba4b..213cce9537 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -6,40 +6,28 @@ Support for child processes running as non-daemons based on http://stackoverflow.com/a/8963618/1183453 """ -from __future__ import (print_function, division, unicode_literals, - absolute_import) +from __future__ import print_function, division, unicode_literals, absolute_import # Import packages -import os from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception import sys -from logging import INFO -import gc from copy import deepcopy import numpy as np + from ... import logging from ...utils.profiler import get_system_total_memory_gb from ..engine import MapNode from .base import DistributedPluginBase -import json - -try: - from textwrap import indent -except ImportError: - - def indent(text, prefix): - """ A textwrap.indent replacement for Python < 3.3 """ - if not prefix: - return text - splittext = text.splitlines(True) - return prefix + prefix.join(splittext) - # Init logger logger = logging.getLogger('workflow') +# GPU stuff +import json +import os + # Run node def run_node(node, updatehash, taskid, devno=None): @@ -52,10 +40,8 @@ def run_node(node, updatehash, taskid, devno=None): the node to run updatehash : boolean flag for updating hash - taskid : int - an identifier for this task - devno: the device id of the GPU to make it the only visible device before + devno: the device id of the GPU to make it the only visible device before submitting a job to it Returns ------- @@ -68,13 +54,13 @@ def run_node(node, updatehash, taskid, devno=None): # Try and execute the node via node.run() try: - # set dev visible if not none + #set dev visible if not none if devno is not None: os.environ['CUDA_VISIBLE_DEVICES'] = str(devno) - logger.info('CUDA_VISIBLE_DEVICE=%d', devno) - + logger.info('CUDA_VISIBLE_DEVICE=%d',devno) + result['result'] = node.run(updatehash=updatehash) - except: # noqa: E722, intendedly catch all here + except: result['traceback'] = format_exception(*sys.exc_info()) result['result'] = node.result @@ -142,80 +128,80 @@ def __init__(self, plugin_args=None): self._task_obj = {} self._taskid = 0 - # Cache current working directory and make sure we - # change to it when workers are set up - self._cwd = os.getcwd() - # Read in options or set defaults. non_daemon = self.plugin_args.get('non_daemon', True) maxtasks = self.plugin_args.get('maxtasksperchild', 10) self.processors = self.plugin_args.get('n_procs', cpu_count()) - self.memory_gb = self.plugin_args.get( - 'memory_gb', # Allocate 90% of system memory - get_system_total_memory_gb() * 0.9) - self.raise_insufficient = self.plugin_args.get('raise_insufficient', - True) - + self.memory_gb = self.plugin_args.get('memory_gb', # Allocate 90% of system memory + get_system_total_memory_gb() * 0.9) + self.raise_insufficient = self.plugin_args.get('raise_insufficient', True) + self.n_gpus_visible = self.gpu_count() self.n_gpus = self.plugin_args.get('n_gpus', self.n_gpus_visible) self.n_gpu_proc = self.plugin_args.get('ngpuproc', 1) - - # total no. of processes allowed on all gpus + + # Check plugin args + if self.plugin_args: + if 'non_daemon' in self.plugin_args: + non_daemon = plugin_args['non_daemon'] + if 'n_procs' in self.plugin_args: + self.processors = self.plugin_args['n_procs'] + if 'memory_gb' in self.plugin_args: + self.memory_gb = self.plugin_args['memory_gb'] + if 'n_gpus' in self.plugin_args: + self.n_gpus = self.plugin_args['n_gpus'] + if 'ngpuproc' in self.plugin_args: + self.n_gpu_proc = self.plugin_args['ngpuproc'] + + + #total no. of processes allowed on all gpus if self.n_gpus > self.n_gpus_visible: - logger.info('Total number of GPUs (%d) requested exceeds ' - 'available number of GPUs (%d). Using all %d GPU(s).', - self.n_gpus, self.n_gpus_visible, self.n_gpus_visible) + logger.info('Total number of GPUs (%d) requested exceeds available number of GPUs (%d). Using all %d GPU(s).'%(self.n_gpus,self.n_gpus_visible, self.n_gpus_visible)) self.n_gpus = self.n_gpus_visible self.total_gpu_processors = self.n_gpus * self.n_gpu_proc else: - # total gpu_processors = no.of GPUs * no.of threads per single GPU + #total gpu_processors = no.of GPUs * no.of threads per single GPU self.total_gpu_processors = self.n_gpus * self.n_gpu_proc + - # form a GPU queue first - gpus = [] + #form a GPU queue first + gpus=[] try: import GPUtil - ngpus = GPUtil.getGPUs() - gpus = list(range(len(ngpus))) + ngpus=GPUtil.getGPUs() + gpus=list(range(len(ngpus))) except ImportError: - gpus = list(range(self.n_gpus)) + gpus=list(range(self.n_gpus)) - self.gpu_q = {} - - # initialize the queue, set all slots free - slotno = 0 + self.gpu_q={} + + #initialize the queue, set all slots free + slotno=0 for gpu in range(len(gpus)): - temp = {} + temp={} for ngp in range(self.n_gpu_proc): - slotno += 1 - temp.update({slotno: 'free'}) - self.gpu_q.update({gpu: temp}) - + slotno +=1 + temp.update({slotno:'free'}) + self.gpu_q.update({ gpu: temp }) + + # Instantiate different thread pools for non-daemon processes - logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, ' - 'mem_gb=%0.2f, ngpus=%d, cwd=%s)', - 'non' * int(non_daemon), self.processors, self.memory_gb, - self.n_gpus, self._cwd) + logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f', + ' ngpus=%d)', + 'non' if non_daemon else '', self.processors, self.memory_gb, self.n_gpus) NipypePool = NonDaemonPool if non_daemon else Pool try: - self.pool = NipypePool( - processes=self.processors, - maxtasksperchild=maxtasks, - initializer=os.chdir, - initargs=(self._cwd,) - ) + self.pool = NipypePool(processes=self.processors, + maxtasksperchild=maxtasks) except TypeError: - # Python < 3.2 does not have maxtasksperchild - # When maxtasksperchild is not set, initializer is not to be - # called self.pool = NipypePool(processes=self.processors) self._stats = None + + def _async_callback(self, args): - # Make sure runtime is not left at a dubious working directory - os.chdir(self._cwd) self._taskresult[args['taskid']] = args def _get_result(self, taskid): @@ -223,56 +209,57 @@ def _get_result(self, taskid): def _clear_task(self, taskid): del self._task_obj[taskid] - + + def gpu_count(self): - ngpus = 1 + ngpus=1 try: - import GPUtil - ngpus = len(GPUtil.getGPUs()) + import GPUtil + return len(GPUtil.getGPUs()) except ImportError: - pass - return ngpus - + return ngpus + def gpu_has_free_slot(self): - # if a GPU has free slot, return True,its device-ID and the slot no. - free = False - devno = None - slotno = None + #if a GPU has free slot, return True,its device-ID and the slot no. + free=False + devno=None + slotno=None for dk in self.gpu_q.keys(): - devno = dk + devno=dk for sdk in self.gpu_q[dk].keys(): - if self.gpu_q[dk][sdk] == 'free': - free = True - slotno = sdk - break + if self.gpu_q[dk][sdk]=='free': + free=True + slotno=sdk + break if free: break - return free, devno, slotno - - def set_gpu_slot_busy(self, slotno, jobid): - # if a GPU has free slot, book it for a jobid,modify the queue and set - # its slotno busy - devno = None + return free,devno,slotno + + def set_gpu_slot_busy(self,slotno,jobid): + #if a GPU has free slot, book it for a jobid,modify the queue and set its slotno busy + devno=None for dk in self.gpu_q.keys(): for sk in self.gpu_q[dk].keys(): - if sk == slotno: - devno = dk - self.gpu_q[dk][sk] = {'state': 'busy', 'jobid': jobid} + if sk==slotno: + devno=dk + self.gpu_q[dk][sk]= {'state':'busy','jobid':jobid} return devno - def set_gpu_slot_free(self, jobid): - # if a GPU task is finished, then set the slotno free in the queue - devno = None + + def set_gpu_slot_free(self,jobid): + #if a GPU task is finished, then set the slotno free in the queue + devno=None for dk in self.gpu_q.keys(): for sdk in self.gpu_q[dk].keys(): - if isinstance(self.gpu_q[dk][sdk], dict): + if isinstance(self.gpu_q[dk][sdk],dict): if self.gpu_q[dk][sdk]['jobid'] == jobid: - devno = dk - self.gpu_q[dk][sdk] = 'free' + devno=dk + self.gpu_q[dk][sdk]='free' return devno - # override, to set gpu slot free, if the job was a gpu job + + #override, to set gpu slot free, if the job was a gpu job def _task_finished_cb(self, jobid): """ Extract outputs and assign to inputs of dependent tasks @@ -290,15 +277,14 @@ def _task_finished_cb(self, jobid): if jobid not in self.mapnodesubids: self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0 - # update queue status - was_gpu_job = ( - hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or - hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + #update queue status + was_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) if was_gpu_job: - devid = self.set_gpu_slot_free(jobid) + devid=self.set_gpu_slot_free(jobid) if devid is not None: - logger.info('Device %d slot set free from jobid %d', devid, - jobid) + logger.info('Device %d slot set free from jobid %d' % (devid, jobid) ) + def _submit_job(self, node, devno=None, updatehash=False): self._taskid += 1 @@ -311,7 +297,7 @@ def _submit_job(self, node, devno=None, updatehash=False): run_node, (node, updatehash, self._taskid, devno), callback=self._async_callback) - logger.debug('[MultiProc] Submitted task %s (taskid=%d).', + logger.debug('MultiProc submitted task %s (taskid=%d).', node.fullname, self._taskid) return self._taskid @@ -320,11 +306,11 @@ def _prerun_check(self, graph): tasks_mem_gb = [] tasks_num_th = [] tasks_gpu_th = [] - + for node in graph.nodes(): tasks_mem_gb.append(node.mem_gb) tasks_num_th.append(node.n_procs) - is_gpu_job = (hasattr(node.interface.inputs, 'use_cuda') or + is_gpu_job = (hasattr(node.interface.inputs, 'use_cuda') or \ hasattr(node.interface.inputs, 'use_gpu')) if is_gpu_job: tasks_gpu_th.append(node.n_procs) @@ -342,14 +328,13 @@ def _prerun_check(self, graph): self.processors) if self.raise_insufficient: raise RuntimeError('Insufficient resources available for job') - + if np.any(np.array(tasks_gpu_th) > self.total_gpu_processors): logger.warning( 'Nodes demand more processes than allowed (%d).', self.total_gpu_processors) if self.raise_insufficient: - raise RuntimeError('Insufficient GPU resources available for ' - 'job') + raise RuntimeError('Insufficient GPU resources available for job') def _postrun_check(self): self.pool.close() @@ -361,15 +346,14 @@ def _check_resources(self, running_tasks): free_memory_gb = self.memory_gb free_processors = self.processors free_gpu_slots = self.total_gpu_processors - + + for _, jobid in running_tasks: - is_gpu_job = ( - hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or - hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) + is_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) if is_gpu_job: - free_gpu_slots -= min(self.procs[jobid].n_procs, - free_gpu_slots) - + free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots) + free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb) free_processors -= min(self.procs[jobid].n_procs, free_processors) @@ -380,40 +364,20 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): Sends jobs to workers when system resources are available. """ - # Check to see if a job is available (jobs with all dependencies run) + # Check to see if a job is available (jobs without dependencies not run) # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722 - # See also https://github.com/nipy/nipype/issues/2372 - jobids = np.flatnonzero(~self.proc_done & - (self.depidx.sum(axis=0) == 0).__array__()) + jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1] - # Check available resources by summing all threads and memory used - free_memory_gb, free_processors, free_gpu_slots = \ - self._check_resources(self.pending_tasks) + # Check available system resources by summing all threads and memory used + free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks) stats = (len(self.pending_tasks), len(jobids), free_memory_gb, - self.memory_gb, free_processors, self.processors, - free_gpu_slots, self.total_gpu_processors) - + self.memory_gb, free_processors, self.processors, free_gpu_slots, self.total_gpu_processors) + if self._stats != stats: - tasks_list_msg = '' - - if logger.level <= INFO: - running_tasks = [ - ' * %s' % self.procs[jobid].fullname - for _, jobid in self.pending_tasks - ] - if running_tasks: - tasks_list_msg = '\nCurrently running:\n' - tasks_list_msg += '\n'.join(running_tasks) - tasks_list_msg = indent(tasks_list_msg, ' ' * 21) - logger.info( - '[MultiProc] Running %d tasks, and %d jobs ready. Free ' - 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free ' - 'GPUs: %d/%d.%s', - len(self.pending_tasks), len(jobids), free_memory_gb, - self.memory_gb, free_processors, self.processors, - free_gpu_slots, self.total_gpu_processors, - tasks_list_msg) + logger.info('Currently running %d tasks, and %d jobs ready. Free ' + 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d Free GPU slots %d/%d', + *stats) self._stats = stats if free_memory_gb < 0.01 or free_processors == 0: @@ -425,11 +389,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): 'be submitted to the queue. Potential deadlock') return - jobids = self._sort_jobs( - jobids, scheduler=self.plugin_args.get('scheduler')) - - # Run garbage collector before potentially submitting jobs - gc.collect() + jobids = self._sort_jobs(jobids, scheduler=self.plugin_args.get('scheduler')) # Submit jobs for jobid in jobids: @@ -439,119 +399,139 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): num_subnodes = self.procs[jobid].num_subnodes() except Exception: traceback = format_exception(*sys.exc_info()) - self._clean_queue( - jobid, - graph, - result={ - 'result': None, - 'traceback': traceback - }) + self._report_crash(self.procs[jobid], traceback=traceback) + self._clean_queue(jobid, graph) self.proc_pending[jobid] = False continue if num_subnodes > 1: submit = self._submit_mapnode(jobid) if not submit: continue + + is_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ + hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) - is_gpu_job = ( - hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or - hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) # Check requirements of this job next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb) - if is_gpu_job: - next_job_gpu_th = min(self.procs[jobid].n_procs, - self.total_gpu_processors) - else: - next_job_th = min(self.procs[jobid].n_procs, self.processors) - + next_job_th = min(self.procs[jobid].n_procs, self.processors) + next_job_gpu_th = min(self.procs[jobid].n_procs, self.total_gpu_processors) + if is_gpu_job and next_job_gpu_th > free_gpu_slots: - + logger.debug('Cannot allocate job %d on GPU (%d slots).', jobid, next_job_gpu_th) continue - - elif (not is_gpu_job and next_job_th > free_processors or - next_job_gb > free_memory_gb): + + elif not is_gpu_job and next_job_th > free_processors or next_job_gb > free_memory_gb: # If node does not fit, skip at this moment - logger.debug('Cannot allocate job %d (%0.2fGB, %d threads, ' - '%d slots).', jobid, next_job_gb, next_job_th, - next_job_gpu_th) + logger.debug('Cannot allocate job %d (%0.2fGB, %d threads, %d slots).', + jobid, next_job_gb, next_job_th, next_job_gpu_th) continue free_memory_gb -= next_job_gb free_processors -= next_job_th - free_gpu_slots -= next_job_gpu_th - - logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: ' - '%0.2fGB, %d threads.', self.procs[jobid].fullname, - jobid, next_job_gb, next_job_th, free_memory_gb, - free_processors) - + if is_gpu_job: - is_gpu_free, devno, slotno = self.gpu_has_free_slot() + + free_gpu_slots -= next_job_gpu_th + + is_gpu_free,devno,slotno = self.gpu_has_free_slot() + if devno is not None and slotno is not None: - self.set_gpu_slot_busy(slotno, jobid) - logger.info( - '[*** GPU ID: %d Running: %s Job-ID: %d, Slot-ID:%d, ' - 'Queue State:%s', devno, self.procs[jobid]._id, jobid, - slotno, json.dumps(self.gpu_q)) - - # change job status in appropriate queues - self.proc_done[jobid] = True - self.proc_pending[jobid] = True - - # If cached and up-to-date just retrieve it, don't run - if self._local_hash_check(jobid, graph): - continue - - # updatehash and run_without_submitting are also run locally - if updatehash or self.procs[jobid].run_without_submitting: - logger.debug('Running node %s on master thread', - self.procs[jobid]) - try: - self.procs[jobid].run(updatehash=updatehash) - except Exception: - traceback = format_exception(*sys.exc_info()) - self._clean_queue( - jobid, - graph, - result={ - 'result': None, - 'traceback': traceback - }) - - # Release resources - self._task_finished_cb(jobid) - self._remove_node_dirs() - free_memory_gb += next_job_gb - free_processors += next_job_th + self.set_gpu_slot_busy(slotno,jobid) + logger.info('[*** GPU ID: %d Running: %s Job-ID: %d, Slot-ID:%d, Queue State:%s' % + (devno, self.procs[jobid]._id, jobid, slotno, json.dumps(self.gpu_q))) + + # change job status in appropriate queues + self.proc_done[jobid] = True + self.proc_pending[jobid] = True + + # If cached just retrieve it, don't run + if self._local_hash_check(jobid, graph): + continue + + if self.procs[jobid].run_without_submitting: + logger.debug('Running node %s on master thread', self.procs[jobid]) + try: + self.procs[jobid].run() + except Exception: + traceback = format_exception(*sys.exc_info()) + self._report_crash(self.procs[jobid], traceback=traceback) + + # Release resources + self._task_finished_cb(jobid) + self._remove_node_dirs() + free_memory_gb += next_job_gb + free_processors += next_job_th + free_gpu_slots += next_job_gpu_th + + # Display stats next loop + self._stats = None + continue + + # Task should be submitted to workers + # Send job to task manager and add to pending tasks + if self._status_callback: + self._status_callback(self.procs[jobid], 'start') + tid = self._submit_job(deepcopy(self.procs[jobid]),devno, + updatehash=updatehash) + + if tid is None: + self.proc_done[jobid] = False + self.proc_pending[jobid] = False + else: + self.pending_tasks.insert(0, (tid, jobid)) # Display stats next loop self._stats = None - - # Clean up any debris from running node in main process - gc.collect() - continue - - # Task should be submitted to workers - # Send job to task manager and add to pending tasks - if self._status_callback: - self._status_callback(self.procs[jobid], 'start') - tid = self._submit_job( - deepcopy(self.procs[jobid]), updatehash=updatehash) - if tid is None: - self.proc_done[jobid] = False - self.proc_pending[jobid] = False else: - self.pending_tasks.insert(0, (tid, jobid)) - # Display stats next loop - self._stats = None + logger.info('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.', + self.procs[jobid].fullname, jobid, next_job_gb, next_job_th, + free_memory_gb, free_processors) + # change job status in appropriate queues + self.proc_done[jobid] = True + self.proc_pending[jobid] = True + + # If cached just retrieve it, don't run + if self._local_hash_check(jobid, graph): + continue + + if self.procs[jobid].run_without_submitting: + logger.debug('Running node %s on master thread', self.procs[jobid]) + try: + self.procs[jobid].run() + except Exception: + traceback = format_exception(*sys.exc_info()) + self._report_crash(self.procs[jobid], traceback=traceback) + + # Release resources + self._task_finished_cb(jobid) + self._remove_node_dirs() + free_memory_gb += next_job_gb + free_processors += next_job_th + free_gpu_slots += next_job_gpu_th + + # Display stats next loop + self._stats = None + continue + + # Task should be submitted to workers + # Send job to task manager and add to pending tasks + if self._status_callback: + self._status_callback(self.procs[jobid], 'start') + tid = self._submit_job(deepcopy(self.procs[jobid]),None, + updatehash=updatehash) + + if tid is None: + self.proc_done[jobid] = False + self.proc_pending[jobid] = False + else: + self.pending_tasks.insert(0, (tid, jobid)) + # Display stats next loop + self._stats = None def _sort_jobs(self, jobids, scheduler='tsort'): if scheduler == 'mem_thread': - return sorted( - jobids, - key=lambda item: (self.procs[item].mem_gb, - self.procs[item].n_procs) - ) + return sorted(jobids, key=lambda item: ( + self.procs[item].mem_gb, self.procs[item].n_procs)) return jobids From e92e0c33593ec0b612680b5868b798fd458ced08 Mon Sep 17 00:00:00 2001 From: schahid Date: Sun, 27 May 2018 19:39:43 +0200 Subject: [PATCH 6/7] chache arg in base's task_finish_cb method --- nipype/pipeline/plugins/multiproc.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 213cce9537..ba2b77c01d 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -258,15 +258,16 @@ def set_gpu_slot_free(self,jobid): self.gpu_q[dk][sdk]='free' return devno - + #override, to set gpu slot free, if the job was a gpu job - def _task_finished_cb(self, jobid): + def _task_finished_cb(self, jobid, cached=False): """ Extract outputs and assign to inputs of dependent tasks This is called when a job is completed. """ - logger.info('[Job finished] Jobname: %s JobID: %d' % - (self.procs[jobid]._id, jobid)) + logger.info('[Job %d] %s (%s).', jobid, 'Cached' + if cached else 'Completed', self.procs[jobid]) + if self._status_callback: self._status_callback(self.procs[jobid], 'end') # Update job and worker queues From 1e1adc8ec03f76718638e81a2d149d4a9bacfe8d Mon Sep 17 00:00:00 2001 From: schahid Date: Thu, 21 Mar 2019 12:29:55 +0100 Subject: [PATCH 7/7] allow to control single gpu processes using n_procs of the node running on gpu. --- nipype/pipeline/plugins/multiproc.py | 97 ++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 14 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index ba2b77c01d..9c4561087b 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -61,6 +61,10 @@ def run_node(node, updatehash, taskid, devno=None): result['result'] = node.run(updatehash=updatehash) except: + if devno is not None: + os.environ['CUDA_VISIBLE_DEVICES'] = str(devno) + logger.info('EXC: CUDA_VISIBLE_DEVICE=%d',devno) + result['traceback'] = format_exception(*sys.exc_info()) result['result'] = node.result @@ -156,8 +160,8 @@ def __init__(self, plugin_args=None): #total no. of processes allowed on all gpus if self.n_gpus > self.n_gpus_visible: - logger.info('Total number of GPUs (%d) requested exceeds available number of GPUs (%d). Using all %d GPU(s).'%(self.n_gpus,self.n_gpus_visible, self.n_gpus_visible)) - self.n_gpus = self.n_gpus_visible + logger.info('Total number of GPUs (%d) requested exceeds the available number of GPUs (%d) on the system. Using requested %d GPU(s) (!!!at your own risk!!!).'%(self.n_gpus,self.n_gpus_visible, self.n_gpus)) + #self.n_gpus = self.n_gpus_visible self.total_gpu_processors = self.n_gpus * self.n_gpu_proc else: #total gpu_processors = no.of GPUs * no.of threads per single GPU @@ -218,7 +222,27 @@ def gpu_count(self): return len(GPUtil.getGPUs()) except ImportError: return ngpus - + + def gpu_has_free_slots(self,nproc): + #if a single GPU has enough slots for nproc + free=False + devno=None + slotnos=None + + for dk in self.gpu_q.keys(): + devno=dk + slotnos=[] + for sdk in self.gpu_q[dk].keys(): + if self.gpu_q[dk][sdk]=='free': + slotnos.append(sdk) + if len(slotnos) == nproc: + free=True + break + if free: + break + + return free,devno,slotnos + def gpu_has_free_slot(self): #if a GPU has free slot, return True,its device-ID and the slot no. free=False @@ -235,7 +259,20 @@ def gpu_has_free_slot(self): break return free,devno,slotno + + + def set_gpu_slots_busy(self,slotnos,jobid): + #if a GPU has free slots, book all for that single jobid + devno=None + for dk in self.gpu_q.keys(): + for sk in self.gpu_q[dk].keys(): + for slotno in slotnos: + if sk==slotno: + devno=dk + self.gpu_q[dk][sk]= {'state':'busy','jobid':jobid} + return devno + def set_gpu_slot_busy(self,slotno,jobid): #if a GPU has free slot, book it for a jobid,modify the queue and set its slotno busy devno=None @@ -258,6 +295,17 @@ def set_gpu_slot_free(self,jobid): self.gpu_q[dk][sdk]='free' return devno + def set_gpu_slots_free(self,jobid): + #if a GPU task is finished, then set the slots free in the queue + devno=None + for dk in self.gpu_q.keys(): + for sdk in self.gpu_q[dk].keys(): + if isinstance(self.gpu_q[dk][sdk],dict): + if self.gpu_q[dk][sdk]['jobid'] == jobid: + devno=dk + self.gpu_q[dk][sdk]='free' + return devno + #override, to set gpu slot free, if the job was a gpu job def _task_finished_cb(self, jobid, cached=False): @@ -282,9 +330,14 @@ def _task_finished_cb(self, jobid, cached=False): was_gpu_job = (hasattr(self.procs[jobid]._interface.inputs, 'use_cuda') or \ hasattr(self.procs[jobid]._interface.inputs, 'use_gpu')) if was_gpu_job: - devid=self.set_gpu_slot_free(jobid) - if devid is not None: - logger.info('Device %d slot set free from jobid %d' % (devid, jobid) ) + if self.procs[jobid].n_procs > 1: + devid=self.set_gpu_slots_free(jobid) + if devid is not None: + logger.info('GPU Device no %d slots set free from jobid %d' % (devid,jobid) ) + else: + devid=self.set_gpu_slot_free(jobid) + if devid is not None: + logger.info('GPU Device no %d slot set free from jobid %d' % (devid, jobid) ) def _submit_job(self, node, devno=None, updatehash=False): @@ -386,7 +439,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): return if len(jobids) + len(self.pending_tasks) == 0: - logger.debug('No tasks are being run, and no jobs can ' + logger.info('**** ATTENTION ****: No tasks are being run, and no jobs can ' 'be submitted to the queue. Potential deadlock') return @@ -398,6 +451,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): if isinstance(self.procs[jobid], MapNode): try: num_subnodes = self.procs[jobid].num_subnodes() + logger.info('\n***ATTENTION:%d is a MapNode with %d sub-nodes:%d' % (jobid,num_subnodes)) except Exception: traceback = format_exception(*sys.exc_info()) self._report_crash(self.procs[jobid], traceback=traceback) @@ -417,7 +471,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb) next_job_th = min(self.procs[jobid].n_procs, self.processors) next_job_gpu_th = min(self.procs[jobid].n_procs, self.total_gpu_processors) - + + is_gpu_free,devno,slotnos = self.gpu_has_free_slots(next_job_gpu_th) + + if is_gpu_job and next_job_gpu_th > len(slotnos): + logger.debug('Can not allocate slots, insufficient slots for this job %d', jobid) + continue + if is_gpu_job and next_job_gpu_th > free_gpu_slots: logger.debug('Cannot allocate job %d on GPU (%d slots).', @@ -436,12 +496,21 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): if is_gpu_job: free_gpu_slots -= next_job_gpu_th - - is_gpu_free,devno,slotno = self.gpu_has_free_slot() - - if devno is not None and slotno is not None: - self.set_gpu_slot_busy(slotno,jobid) - logger.info('[*** GPU ID: %d Running: %s Job-ID: %d, Slot-ID:%d, Queue State:%s' % + if next_job_gpu_th > 1: + is_gpu_free,devno,slotnos = self.gpu_has_free_slots(next_job_gpu_th) + if is_gpu_free and next_job_gpu_th <= len(slotnos): + self.set_gpu_slots_busy(slotnos,jobid) + logger.info('[*** GPU ID: %d Running Job: %s Job-ID: %d on multiple slots:%s' % + (devno, self.procs[jobid]._id, jobid, slotnos)) + logger.debug('[*** GPU ID: %d Running Job: %s Job-ID: %d on multiple slots:%s, Queue State:%s' % + (devno, self.procs[jobid]._id, jobid, slotnos, json.dumps(self.gpu_q))) + else: + is_gpu_free,devno,slotno = self.gpu_has_free_slot() + if is_gpu_free and slotno is not None: + self.set_gpu_slot_busy(slotno,jobid) + logger.info('[*** GPU ID: %d Running Job: %s Job-ID: %d on single slot-ID:%d' % + (devno, self.procs[jobid]._id, jobid, slotno)) + logger.debug('[*** GPU ID: %d Running Job: %s Job-ID: %d on single slot-ID:%d, Queue State:%s' % (devno, self.procs[jobid]._id, jobid, slotno, json.dumps(self.gpu_q))) # change job status in appropriate queues