-
Notifications
You must be signed in to change notification settings - Fork 532
support for gpu queue #3642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
support for gpu queue #3642
Changes from 1 commit
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
0720aa1
support for gpu queue
mauriliogenovese 6c47dc0
gputil requirement
mauriliogenovese f1f5d76
Update info.py
mauriliogenovese a642430
refactor and fix
mauriliogenovese 684b9b0
removed hard pin
mauriliogenovese 8f74c5d
gpu_count refactor
mauriliogenovese a307845
more readable
mauriliogenovese 27448bc
logger argument
mauriliogenovese 7e57ab9
Merge branch 'master' into enh/cuda_support
mauriliogenovese 2c2c066
code refactory
mauriliogenovese 133dc0a
Merge branch 'enh/cuda_support' of https://github.com/mauriliogenoves…
mauriliogenovese 66d6280
newlines for style check
mauriliogenovese 610f1cb
newline for code check
mauriliogenovese 59862c8
Merge branch 'master' into enh/cuda_support
effigies File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next
Next commit
support for gpu queue
- Loading branch information
commit 0720aa1ac9c672e8b7a6ede4828e603e62778b86
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -100,6 +100,7 @@ class MultiProcPlugin(DistributedPluginBase): | |||||||
|
||||||||
- non_daemon: boolean flag to execute as non-daemon processes | ||||||||
- n_procs: maximum number of threads to be executed in parallel | ||||||||
- n_gpu_procs: maximum number of GPU threads to be executed in parallel | ||||||||
- memory_gb: maximum memory (in GB) that can be used at once. | ||||||||
- raise_insufficient: raise error if the requested resources for | ||||||||
a node over the maximum `n_procs` and/or `memory_gb` | ||||||||
|
@@ -130,10 +131,22 @@ def __init__(self, plugin_args=None): | |||||||
) | ||||||||
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True) | ||||||||
|
||||||||
# GPU found on system | ||||||||
self.n_gpus_visible = MultiProcPlugin.gpu_count() | ||||||||
# proc per GPU set by user | ||||||||
self.n_gpu_procs = plugin_args.get('n_gpu_procs', self.n_gpus_visible) | ||||||||
|
||||||||
# total no. of processes allowed on all gpus | ||||||||
if self.n_gpu_procs > self.n_gpus_visible: | ||||||||
logger.info( | ||||||||
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!' % ( | ||||||||
self.n_gpu_procs, self.n_gpus_visible)) | ||||||||
|
||||||||
# Instantiate different thread pools for non-daemon processes | ||||||||
logger.debug( | ||||||||
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)", | ||||||||
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)", | ||||||||
self.processors, | ||||||||
self.n_gpu_procs, | ||||||||
self.memory_gb, | ||||||||
self._cwd, | ||||||||
) | ||||||||
|
@@ -184,9 +197,12 @@ def _prerun_check(self, graph): | |||||||
"""Check if any node exceeds the available resources""" | ||||||||
tasks_mem_gb = [] | ||||||||
tasks_num_th = [] | ||||||||
tasks_gpu_th = [] | ||||||||
for node in graph.nodes(): | ||||||||
tasks_mem_gb.append(node.mem_gb) | ||||||||
tasks_num_th.append(node.n_procs) | ||||||||
if node.is_gpu_node(): | ||||||||
tasks_gpu_th.append(node.n_procs) | ||||||||
|
||||||||
if np.any(np.array(tasks_mem_gb) > self.memory_gb): | ||||||||
logger.warning( | ||||||||
|
@@ -203,6 +219,12 @@ def _prerun_check(self, graph): | |||||||
) | ||||||||
if self.raise_insufficient: | ||||||||
raise RuntimeError("Insufficient resources available for job") | ||||||||
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs): | ||||||||
logger.warning( | ||||||||
'Nodes demand more GPU than allowed (%d).', | ||||||||
self.n_gpu_procs) | ||||||||
if self.raise_insufficient: | ||||||||
raise RuntimeError('Insufficient GPU resources available for job') | ||||||||
|
||||||||
def _postrun_check(self): | ||||||||
self.pool.shutdown() | ||||||||
|
@@ -213,11 +235,14 @@ def _check_resources(self, running_tasks): | |||||||
""" | ||||||||
free_memory_gb = self.memory_gb | ||||||||
free_processors = self.processors | ||||||||
free_gpu_slots = self.n_gpu_procs | ||||||||
for _, jobid in running_tasks: | ||||||||
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb) | ||||||||
free_processors -= min(self.procs[jobid].n_procs, free_processors) | ||||||||
if self.procs[jobid].is_gpu_node(): | ||||||||
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots) | ||||||||
|
||||||||
return free_memory_gb, free_processors | ||||||||
return free_memory_gb, free_processors, free_gpu_slots | ||||||||
|
||||||||
def _send_procs_to_workers(self, updatehash=False, graph=None): | ||||||||
""" | ||||||||
|
@@ -232,7 +257,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): | |||||||
) | ||||||||
|
||||||||
# Check available resources by summing all threads and memory used | ||||||||
free_memory_gb, free_processors = self._check_resources(self.pending_tasks) | ||||||||
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks) | ||||||||
|
||||||||
stats = ( | ||||||||
len(self.pending_tasks), | ||||||||
|
@@ -241,6 +266,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): | |||||||
self.memory_gb, | ||||||||
free_processors, | ||||||||
self.processors, | ||||||||
free_gpu_slots, | ||||||||
self.n_gpu_procs | ||||||||
) | ||||||||
if self._stats != stats: | ||||||||
tasks_list_msg = "" | ||||||||
|
@@ -256,13 +283,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): | |||||||
tasks_list_msg = indent(tasks_list_msg, " " * 21) | ||||||||
logger.info( | ||||||||
"[MultiProc] Running %d tasks, and %d jobs ready. Free " | ||||||||
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s", | ||||||||
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s", | ||||||||
len(self.pending_tasks), | ||||||||
len(jobids), | ||||||||
free_memory_gb, | ||||||||
self.memory_gb, | ||||||||
free_processors, | ||||||||
self.processors, | ||||||||
free_gpu_slots, | ||||||||
self.n_gpu_procs, | ||||||||
tasks_list_msg, | ||||||||
) | ||||||||
self._stats = stats | ||||||||
|
@@ -304,28 +333,36 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): | |||||||
# Check requirements of this job | ||||||||
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb) | ||||||||
next_job_th = min(self.procs[jobid].n_procs, self.processors) | ||||||||
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs) | ||||||||
|
||||||||
is_gpu_node = self.procs[jobid].is_gpu_node() | ||||||||
|
||||||||
# If node does not fit, skip at this moment | ||||||||
if next_job_th > free_processors or next_job_gb > free_memory_gb: | ||||||||
if (next_job_th > free_processors or next_job_gb > free_memory_gb | ||||||||
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)): | ||||||||
logger.debug( | ||||||||
"Cannot allocate job %d (%0.2fGB, %d threads).", | ||||||||
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).", | ||||||||
jobid, | ||||||||
next_job_gb, | ||||||||
next_job_th, | ||||||||
next_job_gpu_th, | ||||||||
) | ||||||||
continue | ||||||||
|
||||||||
free_memory_gb -= next_job_gb | ||||||||
free_processors -= next_job_th | ||||||||
if is_gpu_node: | ||||||||
free_gpu_slots -= next_job_gpu_th | ||||||||
logger.debug( | ||||||||
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: " | ||||||||
"%0.2fGB, %d threads.", | ||||||||
"%0.2fGB, %d threads, %d GPU slots.", | ||||||||
self.procs[jobid].fullname, | ||||||||
jobid, | ||||||||
next_job_gb, | ||||||||
next_job_th, | ||||||||
free_memory_gb, | ||||||||
free_processors, | ||||||||
free_gpu_slots, | ||||||||
) | ||||||||
|
||||||||
# change job status in appropriate queues | ||||||||
|
@@ -352,6 +389,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): | |||||||
self._remove_node_dirs() | ||||||||
free_memory_gb += next_job_gb | ||||||||
free_processors += next_job_th | ||||||||
if is_gpu_node: | ||||||||
free_gpu_slots -= next_job_gpu_th | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this is releasing resource claims that were made around line 356 so the next time through the loop sees available resources.
Suggested change
|
||||||||
# Display stats next loop | ||||||||
self._stats = None | ||||||||
|
||||||||
|
@@ -379,3 +418,12 @@ def _sort_jobs(self, jobids, scheduler="tsort"): | |||||||
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs), | ||||||||
) | ||||||||
return jobids | ||||||||
|
||||||||
@staticmethod | ||||||||
def gpu_count(): | ||||||||
n_gpus = 1 | ||||||||
try: | ||||||||
import GPUtil | ||||||||
return len(GPUtil.getGPUs()) | ||||||||
except ImportError: | ||||||||
return n_gpus |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this to be hit by your test, but coverage shows it's not. Can you look into this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I missed that because I never used updatedhash=True, but it seems that no test includes that. Should we add a test with that option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moreover that error does not impact "common" use (I have a project including this gpu support code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I was looking into this I found two error about updatehash functionality. I sent a pull request #3709 to fix the biggest.
The second is that in multiproc plugin EVERY node will be executed in main thread if updatehash=True, so no multi process is enabled. I will try to send a pull request for that too (maybe after this gpu support is merged to avoid to handle merge conflicts)