From 61e20378a184a3e60ef96d547df8dd136ff7d03f Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 13:11:58 -0700 Subject: [PATCH 1/4] Adds timer to avoid saturating CPU usage --- llama_cpp/_utils.py | 5 +++-- llama_cpp/server/app.py | 14 ++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index a4b4bbd51..2a10f861a 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -153,11 +153,10 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): a shared status dictionary with the number of tasks that have not started and the number of tasks that are currently running. It recursively calls itself to continuously monitor the task queue. - NOTE: There will always be 4 tasks running in the task queue: + NOTE: There will always be 3 tasks running in the task queue: - LifespanOn.main: Main application coroutine - Server.serve: Server coroutine - monitor_task_queue: Task queue monitoring coroutine - - RequestReponseCycle.run_asgi: ASGI single cycle coroutine Any upcoming requests will be added to the task queue in the form of another RequestReponseCycle.run_asgi coroutine. """ @@ -175,6 +174,8 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): for task in all_tasks } + await asyncio.sleep(5) # adds a delay of 5 seconds to avoid overloading the CPU + asyncio.create_task( monitor_task_queue(status_dict) ) # pass status_dict to the next task diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 30601b8de..43f36dd8e 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -246,22 +246,20 @@ async def authenticate( response_model=HealthMetrics, summary="Server's health", ) -async def check_health( - -): - # 4 running tasks + new scheduled request - if 0 <= task_queue_status.get("running_tasks_count", 0) <= 5: +async def check_health(): + # 3 running tasks + new scheduled request + if 0 <= task_queue_status.get("running_tasks_count", 0) <= 4: return JSONResponse( content={"status": "OK", "task_queue_status": task_queue_status} ) - # 1 - 6 scheduled requests - elif 5 < task_queue_status.get("running_tasks_count", 0) <= 10: + # 2 - 6 scheduled requests + elif 4 < task_queue_status.get("running_tasks_count", 0) < 10: return JSONResponse( content={"status": "Warning", "task_queue_status": task_queue_status} ) # 7+ scheduled requests # TODO: Evaluate if in this case we should manually stop the execution of certain tasks to clear the queue - elif task_queue_status.get("running_tasks_count", 0) > 10: + elif task_queue_status.get("running_tasks_count", 0) >= 10: return JSONResponse( content={"status": "Critical", "task_queue_status": task_queue_status} ) From aab46dac238824a29d1697c8060b97387b1ed614 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:11:57 -0700 Subject: [PATCH 2/4] Converts running_tasks into Histogram --- llama_cpp/llama_metrics.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/llama_cpp/llama_metrics.py b/llama_cpp/llama_metrics.py index 0f9b34a42..a7ffc7a09 100644 --- a/llama_cpp/llama_metrics.py +++ b/llama_cpp/llama_metrics.py @@ -342,10 +342,33 @@ def __init__(self): documentation="KV-cache usage. 1 means 100 percent usage", labelnames=self.labels, ) - self._gauge_running_tasks = Gauge( + self._gauge_running_tasks = Histogram( name="llama_cpp_python:running_tasks", documentation="Number of running tasks in the task queue", labelnames=self.labels, + buckets=[ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 14, + 16, + 18, + 20, + 25, + 30, + 35, + 40, + 45, + 50, + ], ) # Server metadata @@ -399,4 +422,4 @@ def log_queue_metrics(self, metrics: QueueMetrics, labels: Dict[str, str]): """ Log the metrics for the task queue. """ - self._gauge_running_tasks.labels(**labels).set(metrics.running_tasks_count) + self._gauge_running_tasks.labels(**labels).observe(metrics.running_tasks_count) From afd3472806d4b605d64d2f3b188718c6ed18bd6f Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:13:45 -0700 Subject: [PATCH 3/4] Decouples MetricsExporter instance from Llama object --- llama_cpp/_utils.py | 27 ++++++++++++++++++++++----- llama_cpp/llama.py | 8 ++------ llama_cpp/server/app.py | 29 ++++++++++++++++++----------- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/llama_cpp/_utils.py b/llama_cpp/_utils.py index 2a10f861a..14d0542fc 100644 --- a/llama_cpp/_utils.py +++ b/llama_cpp/_utils.py @@ -6,6 +6,9 @@ from typing import Any, Dict, List, Tuple, Union +from llama_cpp.llama_metrics import QueueMetrics, MetricsExporter + + # Avoid "LookupError: unknown encoding: ascii" when open() called in a destructor outnull_file = open(os.devnull, "w") errnull_file = open(os.devnull, "w") @@ -147,7 +150,9 @@ def get_gpu_general_info() -> Tuple[float, float, float]: return 0.0, 0.0, 0.0 -async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): +async def monitor_task_queue( + status_dict: Dict[str, Union[int, float]], metrics_exporter: MetricsExporter +): """ An asynchronous function that monitors the task queue and updates a shared status dictionary with the number of tasks that have not @@ -160,6 +165,9 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): Any upcoming requests will be added to the task queue in the form of another RequestReponseCycle.run_asgi coroutine. """ + if not isinstance(metrics_exporter, MetricsExporter): + raise ValueError("metrics_exporter must be an instance of MetricsExporter") + all_tasks = asyncio.all_tasks() # Get count of all running tasks @@ -168,14 +176,23 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]): # Get basic metadata of all running tasks status_dict["running_tasks"] = { task.get_name(): str(task.get_coro()) - .encode("ascii", errors="ignore") - .strip() - .decode("ascii") + .lstrip("\u003C") + .rstrip("\u003E") for task in all_tasks } + assert status_dict is not None + + # Register current running tasks as a Prometheus metric + _labels = { + "service": "general", + "request_type": "health_check", + } + _queue_metrics = QueueMetrics(**status_dict) + metrics_exporter.log_queue_metrics(_queue_metrics, _labels) + await asyncio.sleep(5) # adds a delay of 5 seconds to avoid overloading the CPU asyncio.create_task( - monitor_task_queue(status_dict) + monitor_task_queue(status_dict, metrics_exporter) ) # pass status_dict to the next task diff --git a/llama_cpp/llama.py b/llama_cpp/llama.py index 2d908d9af..ccb2adab5 100644 --- a/llama_cpp/llama.py +++ b/llama_cpp/llama.py @@ -43,7 +43,7 @@ import llama_cpp.llama_cpp as llama_cpp import llama_cpp.llama_chat_format as llama_chat_format -from llama_cpp.llama_metrics import RequestMetrics, MetricsExporter +from llama_cpp.llama_metrics import RequestMetrics from llama_cpp._utils import ( get_cpu_usage, @@ -74,7 +74,6 @@ class Llama: """High-level Python wrapper for a llama.cpp model.""" __backend_initialized = False - __prometheus_metrics = MetricsExporter() def __init__( self, @@ -488,10 +487,7 @@ def __init__( self.chat_format = "llama-2" if self.verbose: print(f"Using fallback chat format: {self.chat_format}", file=sys.stderr) - - # Prometheus metrics - self.metrics = self.__prometheus_metrics - + @property def ctx(self) -> llama_cpp.llama_context_p: assert self._ctx.ctx is not None diff --git a/llama_cpp/server/app.py b/llama_cpp/server/app.py index 43f36dd8e..6af762031 100644 --- a/llama_cpp/server/app.py +++ b/llama_cpp/server/app.py @@ -48,7 +48,7 @@ ) from llama_cpp.server.errors import RouteErrorHandler from llama_cpp._utils import monitor_task_queue -from llama_cpp.llama_metrics import QueueMetrics +from llama_cpp.llama_metrics import MetricsExporter router = APIRouter(route_class=RouteErrorHandler) @@ -102,14 +102,26 @@ def set_ping_message_factory(factory): _ping_message_factory = factory +def set_metrics_exporter(): + global metrics_exporter + try: + metrics_exporter + except NameError: + metrics_exporter = MetricsExporter() + + return metrics_exporter + task_queue_status = {} + @asynccontextmanager async def lifespan(app: FastAPI): """ A context manager that launches tasks to be run during the application's lifespan. """ - await monitor_task_queue(task_queue_status) + metrics_exporter = set_metrics_exporter() + + await monitor_task_queue(task_queue_status, metrics_exporter) yield @@ -514,7 +526,7 @@ async def create_chat_completion( # Adds the ai_service value from the request body to the kwargs # to be passed downstream to the llama_cpp.ChatCompletion object kwargs["ai_service"] = body.ai_service - + llama = llama_proxy(body.model) if body.logit_bias is not None: kwargs["logit_bias"] = ( @@ -523,14 +535,6 @@ async def create_chat_completion( else body.logit_bias ) - # Register current running tasks as a Prometheus metric - _labels = { - "service": "general", - "request_type": "chat/completions", - } - _queue_metrics = QueueMetrics(**task_queue_status) - llama.metrics.log_queue_metrics(_queue_metrics, _labels) - if body.grammar is not None: kwargs["grammar"] = llama_cpp.LlamaGrammar.from_string(body.grammar) @@ -543,6 +547,9 @@ async def create_chat_completion( else: kwargs["logits_processor"].extend(_min_tokens_logits_processor) + # Set the metrics exporter for the llama object + llama.metrics = set_metrics_exporter() + iterator_or_completion: Union[ llama_cpp.ChatCompletion, Iterator[llama_cpp.ChatCompletionChunk] ] = await run_in_threadpool(llama.create_chat_completion, **kwargs) From 89a96e74ee51d4589e89a03c04d0f321791805c4 Mon Sep 17 00:00:00 2001 From: juanroesel Date: Mon, 17 Jun 2024 17:34:13 -0700 Subject: [PATCH 4/4] Updated unit tests with changes to MetricsExporter init --- tests/test_llama.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/test_llama.py b/tests/test_llama.py index aca1745a8..49150c9f2 100644 --- a/tests/test_llama.py +++ b/tests/test_llama.py @@ -5,9 +5,18 @@ from scipy.special import log_softmax import llama_cpp +from llama_cpp.llama_metrics import MetricsExporter MODEL = "./vendor/llama.cpp/models/ggml-vocab-llama-spm.gguf" +def set_test_metrics_exporter(): + global metrics_exporter + try: + metrics_exporter + except NameError: + metrics_exporter = MetricsExporter() + + return metrics_exporter def test_llama_cpp_tokenization(): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, verbose=False) @@ -156,7 +165,8 @@ def test_llama_patch(mock_llama): ai_service_completion = "test-label-suggestions" ai_service_streaming = "test-acceptance-criteria" llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, n_ctx=n_ctx) - + llama.metrics = set_test_metrics_exporter() + n_vocab = llama_cpp.llama_n_vocab(llama._model.model) assert n_vocab == 32000 @@ -231,6 +241,7 @@ def test_llama_pickle(): def test_utf8(mock_llama): llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, logits_all=True) + llama.metrics = set_test_metrics_exporter() output_text = "😀" ai_service = "label-suggestions"