Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Tracking interruption point during large language model output streaming using FastAPI StreamingResponse #13707

Answered by YuriiMotov
kingmming asked this question in Questions
Discussion options

First Check

  • I added a very descriptive title here.
  • I used the GitHub search to find a similar question and didn't find it.
  • I searched the FastAPI documentation, with the integrated search.
  • I already searched in Google "How to X in FastAPI" and didn't find any information.
  • I already read and followed all the tutorial in the docs and didn't find an answer.
  • I already checked if it is not related to FastAPI but to Pydantic.
  • I already checked if it is not related to FastAPI but to Swagger UI.
  • I already checked if it is not related to FastAPI but to ReDoc.

Commit to Help

  • I commit to help with one of those options 👆

Example Code

from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.get("/stream")
async def stream(request: Request):
    async def event_generator():
        try:
            for i in range(100):  # Simulating a long output from a large model
                yield f"data: Line {i}\n\n"
        except Exception as e:
            print(f"Error occurred: {e}")

    return StreamingResponse(event_generator(), media_type="text/event-stream")

Description

Description

I'm using FastAPI's StreamingResponse to stream the output of a large language model to the client. I need to track the exact point at which the user interrupts the streaming process. This is crucial for logging and monitoring purposes, as it helps me understand where users typically stop the output and potentially optimize the model's behavior or user experience.

Scenario

  • I have a FastAPI application that streams the output of a large language model to the client in real-time.
  • The output can be quite long, and users may want to interrupt the streaming at any point.
  • I want to record the exact position (e.g., the line number or token index) where the user interrupted the streaming.

Operating System

Linux

Operating System Details

No response

FastAPI Version

0.115.7

Pydantic Version

2.10.6

Python Version

Python 3.10.14

Additional Context

No response

You must be logged in to vote

Not sure it's the easiest way, but the following code works:

import asyncio

from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse

app = FastAPI()

@app.get("/stream")
async def stream(request: Request):
    async def event_generator(state: dict[str, int | str]):
        try:
            for i in range(20):  # Simulating a long output from a large model
                state["step"] = i
                await asyncio.sleep(0.5)
                yield f"data: Line {i}\n\n"
        except Exception as e:
            print(f"Error occurred: {e}")
        state["finished"] = True

    state = {"step": 0}  # Shared state object

    async def watch_disconnect(re…

Replies: 1 comment · 6 replies

Comment options

Not sure it's the easiest way, but the following code works:

import asyncio

from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse

app = FastAPI()

@app.get("/stream")
async def stream(request: Request):
    async def event_generator(state: dict[str, int | str]):
        try:
            for i in range(20):  # Simulating a long output from a large model
                state["step"] = i
                await asyncio.sleep(0.5)
                yield f"data: Line {i}\n\n"
        except Exception as e:
            print(f"Error occurred: {e}")
        state["finished"] = True

    state = {"step": 0}  # Shared state object

    async def watch_disconnect(request: Request):
        while True:
            if await request.is_disconnected():
                await asyncio.sleep(0.1)
                is_finished = state.get("finished", False)
                if not is_finished:
                    print(f"Client disconnected at step #{state['step']}")
                break

    asyncio.create_task(watch_disconnect(request))

    return StreamingResponse(event_generator(state), media_type="text/event-stream")

Here, before returning StreamingResponse we run watcher task in the background and pass state objects to it.
We also pass the same state object to generator function.

When client disconnects, StreamingResponse will stop iterating through generator function, but we will have it's last state stored in shared state object.
watch_disconnect function handles client disconnect and reports if client disconnected before finishing sending the response.

You must be logged in to vote
6 replies
@YuriiMotov
Comment options

I don't see any potential significant problems with this approach. Would be nice to hear the opinions of others.

You can adjust the sleep time in watch_disconnect. Since you don't need immediate reaction, you can increase it up to 1 sec or even higher.

@kingmming
Comment options

Alright, I will deploy it in the production environment and observe whether there are any issues.

@kingmming
Comment options

Hi! @YuriiMotov I still need to capture the network error disconnection when the client and server are streaming output. How can I distinguish it in your code?

@YuriiMotov
Comment options

As far as I understand, there is no way for ASGI app to distinguish client intentional disconnection and disconnection due to the network error.
https://asgi.readthedocs.io/en/latest/specs/www.html#disconnect-receive-event

Possible solutions are:

  • Use websocket connection instead of HTTP StreamingResponse. This way client can send message before closing connection and you can handle it.
  • Handle client intentional disconnect on frontend side and then send this info to backend (another request to another endpoint).
@kingmming
Comment options

Hi! @YuriiMotov Thank you very much for your detailed explanation and suggestions! I really appreciate your insights and the possible solutions you provided. They are very helpful.

Answer selected by kingmming
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question or problem
2 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.