1
1
import os
2
2
import sys
3
3
import psutil
4
+ import asyncio
4
5
import subprocess
5
6
6
7
from typing import Any , Dict , List , Tuple , Union
12
13
STDOUT_FILENO = 1
13
14
STDERR_FILENO = 2
14
15
16
+
15
17
class suppress_stdout_stderr (object ):
16
18
# NOTE: these must be "saved" here to avoid exceptions when using
17
19
# this context manager inside of a __del__ method
@@ -88,6 +90,7 @@ def get_cpu_usage(pid) -> float:
88
90
process = psutil .Process (pid )
89
91
return process .cpu_percent ()
90
92
93
+
91
94
def get_ram_usage (pid ) -> float :
92
95
"""
93
96
RAM usage in MiB by the current process.
@@ -97,12 +100,19 @@ def get_ram_usage(pid) -> float:
97
100
ram_usage = ram_info .rss / (1024 * 1024 ) # Convert to MiB
98
101
return ram_usage
99
102
103
+
100
104
def get_gpu_info_by_pid (pid ) -> float :
101
105
"""
102
106
GPU memory usage by the current process (if GPU is available)
103
107
"""
104
108
try :
105
- gpu_info = subprocess .check_output (["nvidia-smi" , "--query-compute-apps=pid,used_memory" , "--format=csv,noheader" ]).decode ("utf-8" )
109
+ gpu_info = subprocess .check_output (
110
+ [
111
+ "nvidia-smi" ,
112
+ "--query-compute-apps=pid,used_memory" ,
113
+ "--format=csv,noheader" ,
114
+ ]
115
+ ).decode ("utf-8" )
106
116
gpu_info = gpu_info .strip ().split ("\n " )
107
117
for info in gpu_info :
108
118
gpu_pid , gpu_ram_usage = info .split (", " )
@@ -112,14 +122,59 @@ def get_gpu_info_by_pid(pid) -> float:
112
122
pass
113
123
return 0.0
114
124
125
+
115
126
def get_gpu_general_info () -> Tuple [float , float , float ]:
116
127
"""
117
128
GPU general info (if GPU is available)
118
129
"""
119
130
try :
120
- gpu_info = subprocess .check_output (["nvidia-smi" , "--query-gpu=utilization.gpu,memory.used,memory.free" , "--format=csv,noheader" ]).decode ("utf-8" )
121
- gpu_utilization , gpu_memory_used , gpu_memory_free = gpu_info .strip ().split ("\n " )[0 ].split (", " )
122
- return tuple (float (tup .split ()[0 ]) for tup in [gpu_utilization , gpu_memory_used , gpu_memory_free ])
131
+ gpu_info = subprocess .check_output (
132
+ [
133
+ "nvidia-smi" ,
134
+ "--query-gpu=utilization.gpu,memory.used,memory.free" ,
135
+ "--format=csv,noheader" ,
136
+ ]
137
+ ).decode ("utf-8" )
138
+ gpu_utilization , gpu_memory_used , gpu_memory_free = (
139
+ gpu_info .strip ().split ("\n " )[0 ].split (", " )
140
+ )
141
+ return tuple (
142
+ float (tup .split ()[0 ])
143
+ for tup in [gpu_utilization , gpu_memory_used , gpu_memory_free ]
144
+ )
123
145
except (subprocess .CalledProcessError , FileNotFoundError ):
124
146
pass
125
147
return 0.0 , 0.0 , 0.0
148
+
149
+
150
+ async def monitor_task_queue (status_dict : Dict [str , Union [int , float ]]):
151
+ """
152
+ An asynchronous function that monitors the task queue and updates
153
+ a shared status dictionary with the number of tasks that have not
154
+ started and the number of tasks that are currently running.
155
+ It recursively calls itself to continuously monitor the task queue.
156
+ NOTE: There will always be 4 tasks running in the task queue:
157
+ - LifespanOn.main: Main application coroutine
158
+ - Server.serve: Server coroutine
159
+ - monitor_task_queue: Task queue monitoring coroutine
160
+ - RequestReponseCycle.run_asgi: ASGI single cycle coroutine
161
+ Any upcoming requests will be added to the task queue in the form of
162
+ another RequestReponseCycle.run_asgi coroutine.
163
+ """
164
+ all_tasks = asyncio .all_tasks ()
165
+
166
+ # Get count of all running tasks
167
+ _all_tasks = [task for task in all_tasks if task ._state == "PENDING" ]
168
+ status_dict ["running_tasks_count" ] = len (_all_tasks )
169
+ # Get basic metadata of all running tasks
170
+ status_dict ["running_tasks" ] = {
171
+ task .get_name (): str (task .get_coro ())
172
+ .encode ("ascii" , errors = "ignore" )
173
+ .strip ()
174
+ .decode ("ascii" )
175
+ for task in all_tasks
176
+ }
177
+
178
+ asyncio .create_task (
179
+ monitor_task_queue (status_dict )
180
+ ) # pass status_dict to the next task
0 commit comments