Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Enhanced queue monitoring script #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions 32 llama_cpp/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from typing import Any, Dict, List, Tuple, Union

from llama_cpp.llama_metrics import QueueMetrics, MetricsExporter


# Avoid "LookupError: unknown encoding: ascii" when open() called in a destructor
outnull_file = open(os.devnull, "w")
errnull_file = open(os.devnull, "w")
Expand Down Expand Up @@ -147,20 +150,24 @@ def get_gpu_general_info() -> Tuple[float, float, float]:
return 0.0, 0.0, 0.0


async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]):
async def monitor_task_queue(
status_dict: Dict[str, Union[int, float]], metrics_exporter: MetricsExporter
):
"""
An asynchronous function that monitors the task queue and updates
a shared status dictionary with the number of tasks that have not
started and the number of tasks that are currently running.
It recursively calls itself to continuously monitor the task queue.
NOTE: There will always be 4 tasks running in the task queue:
NOTE: There will always be 3 tasks running in the task queue:
- LifespanOn.main: Main application coroutine
- Server.serve: Server coroutine
- monitor_task_queue: Task queue monitoring coroutine
- RequestReponseCycle.run_asgi: ASGI single cycle coroutine
Any upcoming requests will be added to the task queue in the form of
another RequestReponseCycle.run_asgi coroutine.
"""
if not isinstance(metrics_exporter, MetricsExporter):
raise ValueError("metrics_exporter must be an instance of MetricsExporter")

all_tasks = asyncio.all_tasks()

# Get count of all running tasks
Expand All @@ -169,12 +176,23 @@ async def monitor_task_queue(status_dict: Dict[str, Union[int, float]]):
# Get basic metadata of all running tasks
status_dict["running_tasks"] = {
task.get_name(): str(task.get_coro())
.encode("ascii", errors="ignore")
.strip()
.decode("ascii")
.lstrip("\u003C")
.rstrip("\u003E")
for task in all_tasks
}

assert status_dict is not None

# Register current running tasks as a Prometheus metric
_labels = {
"service": "general",
"request_type": "health_check",
}
_queue_metrics = QueueMetrics(**status_dict)
metrics_exporter.log_queue_metrics(_queue_metrics, _labels)

await asyncio.sleep(5) # adds a delay of 5 seconds to avoid overloading the CPU
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


asyncio.create_task(
monitor_task_queue(status_dict)
monitor_task_queue(status_dict, metrics_exporter)
) # pass status_dict to the next task
8 changes: 2 additions & 6 deletions 8 llama_cpp/llama.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import llama_cpp.llama_cpp as llama_cpp
import llama_cpp.llama_chat_format as llama_chat_format

from llama_cpp.llama_metrics import RequestMetrics, MetricsExporter
from llama_cpp.llama_metrics import RequestMetrics

from llama_cpp._utils import (
get_cpu_usage,
Expand Down Expand Up @@ -74,7 +74,6 @@ class Llama:
"""High-level Python wrapper for a llama.cpp model."""

__backend_initialized = False
__prometheus_metrics = MetricsExporter()

def __init__(
self,
Expand Down Expand Up @@ -488,10 +487,7 @@ def __init__(
self.chat_format = "llama-2"
if self.verbose:
print(f"Using fallback chat format: {self.chat_format}", file=sys.stderr)

# Prometheus metrics
self.metrics = self.__prometheus_metrics


@property
def ctx(self) -> llama_cpp.llama_context_p:
assert self._ctx.ctx is not None
Expand Down
27 changes: 25 additions & 2 deletions 27 llama_cpp/llama_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,33 @@ def __init__(self):
documentation="KV-cache usage. 1 means 100 percent usage",
labelnames=self.labels,
)
self._gauge_running_tasks = Gauge(
self._gauge_running_tasks = Histogram(
name="llama_cpp_python:running_tasks",
documentation="Number of running tasks in the task queue",
labelnames=self.labels,
buckets=[
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
12,
14,
16,
18,
20,
25,
30,
35,
40,
45,
50,
],
)

# Server metadata
Expand Down Expand Up @@ -399,4 +422,4 @@ def log_queue_metrics(self, metrics: QueueMetrics, labels: Dict[str, str]):
"""
Log the metrics for the task queue.
"""
self._gauge_running_tasks.labels(**labels).set(metrics.running_tasks_count)
self._gauge_running_tasks.labels(**labels).observe(metrics.running_tasks_count)
43 changes: 24 additions & 19 deletions 43 llama_cpp/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
)
from llama_cpp.server.errors import RouteErrorHandler
from llama_cpp._utils import monitor_task_queue
from llama_cpp.llama_metrics import QueueMetrics
from llama_cpp.llama_metrics import MetricsExporter


router = APIRouter(route_class=RouteErrorHandler)
Expand Down Expand Up @@ -102,14 +102,26 @@ def set_ping_message_factory(factory):
_ping_message_factory = factory


def set_metrics_exporter():
global metrics_exporter
try:
metrics_exporter
except NameError:
metrics_exporter = MetricsExporter()

return metrics_exporter

task_queue_status = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
"""
A context manager that launches tasks to be run during the application's lifespan.
"""
await monitor_task_queue(task_queue_status)
metrics_exporter = set_metrics_exporter()

await monitor_task_queue(task_queue_status, metrics_exporter)
yield


Expand Down Expand Up @@ -246,22 +258,20 @@ async def authenticate(
response_model=HealthMetrics,
summary="Server's health",
)
async def check_health(

):
# 4 running tasks + new scheduled request
if 0 <= task_queue_status.get("running_tasks_count", 0) <= 5:
async def check_health():
# 3 running tasks + new scheduled request
if 0 <= task_queue_status.get("running_tasks_count", 0) <= 4:
return JSONResponse(
content={"status": "OK", "task_queue_status": task_queue_status}
)
# 1 - 6 scheduled requests
elif 5 < task_queue_status.get("running_tasks_count", 0) <= 10:
# 2 - 6 scheduled requests
elif 4 < task_queue_status.get("running_tasks_count", 0) < 10:
return JSONResponse(
content={"status": "Warning", "task_queue_status": task_queue_status}
)
# 7+ scheduled requests
# TODO: Evaluate if in this case we should manually stop the execution of certain tasks to clear the queue
elif task_queue_status.get("running_tasks_count", 0) > 10:
elif task_queue_status.get("running_tasks_count", 0) >= 10:
return JSONResponse(
content={"status": "Critical", "task_queue_status": task_queue_status}
)
Expand Down Expand Up @@ -516,7 +526,7 @@ async def create_chat_completion(
# Adds the ai_service value from the request body to the kwargs
# to be passed downstream to the llama_cpp.ChatCompletion object
kwargs["ai_service"] = body.ai_service

llama = llama_proxy(body.model)
if body.logit_bias is not None:
kwargs["logit_bias"] = (
Expand All @@ -525,14 +535,6 @@ async def create_chat_completion(
else body.logit_bias
)

# Register current running tasks as a Prometheus metric
_labels = {
"service": "general",
"request_type": "chat/completions",
}
_queue_metrics = QueueMetrics(**task_queue_status)
llama.metrics.log_queue_metrics(_queue_metrics, _labels)

if body.grammar is not None:
kwargs["grammar"] = llama_cpp.LlamaGrammar.from_string(body.grammar)

Expand All @@ -545,6 +547,9 @@ async def create_chat_completion(
else:
kwargs["logits_processor"].extend(_min_tokens_logits_processor)

# Set the metrics exporter for the llama object
llama.metrics = set_metrics_exporter()

iterator_or_completion: Union[
llama_cpp.ChatCompletion, Iterator[llama_cpp.ChatCompletionChunk]
] = await run_in_threadpool(llama.create_chat_completion, **kwargs)
Expand Down
13 changes: 12 additions & 1 deletion 13 tests/test_llama.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@
from scipy.special import log_softmax

import llama_cpp
from llama_cpp.llama_metrics import MetricsExporter

MODEL = "./vendor/llama.cpp/models/ggml-vocab-llama-spm.gguf"

def set_test_metrics_exporter():
global metrics_exporter
try:
metrics_exporter
except NameError:
metrics_exporter = MetricsExporter()

return metrics_exporter

def test_llama_cpp_tokenization():
llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, verbose=False)
Expand Down Expand Up @@ -156,7 +165,8 @@ def test_llama_patch(mock_llama):
ai_service_completion = "test-label-suggestions"
ai_service_streaming = "test-acceptance-criteria"
llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, n_ctx=n_ctx)

llama.metrics = set_test_metrics_exporter()

n_vocab = llama_cpp.llama_n_vocab(llama._model.model)
assert n_vocab == 32000

Expand Down Expand Up @@ -231,6 +241,7 @@ def test_llama_pickle():

def test_utf8(mock_llama):
llama = llama_cpp.Llama(model_path=MODEL, vocab_only=True, logits_all=True)
llama.metrics = set_test_metrics_exporter()

output_text = "😀"
ai_service = "label-suggestions"
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.