-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add 'run_stream_sync' method #3146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
d6e179e
to
ccaf87f
Compare
@ajac-zero Thanks for working on this! I do think that streaming means people want to see the value as they come in, even when running sync code -- it'll just block until the next chunk is received, or buffer chunks if sync processing takes longer than the next chunk comes in. Otherwise, there's not much point in using this new method over just |
@DouweM I took a shot at true streaming implementation, and so far so good. What do you think? |
|
||
|
||
@dataclass(init=False) | ||
class SyncStreamedRunResult(StreamedRunResult[AgentDepsT, OutputDataT]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you seen pydantic_ai.direct.StreamedResponseSync
? I wonder if there's some overlap we could factor out.
"""Create a 'SyncStreamedRunResult' from an existing 'StreamedRunResult'.""" | ||
instance = cls.__new__(cls) | ||
|
||
instance._all_messages = streamed_run_result._all_messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love copying the private values here, even if this is a subclass...
Would it help to add a new class, or to add _sync
methods to StreamedRunResult
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @DouweM. I removed the new class in favor of _sync
methods in StreamedRunResult
. The public API changes a bit more but the code is much cleaner.
Fixes #3005.
This PR tries implementing a possible solution to #3005 by adding a new
run_stream_sync
method that usesrun_stream
under the hood, based on therun_sync
implementation that usesrun
internally.Currently, it uses an 'eager' approach by loading the async iterable upfront and then providing normal iterable to access the stream. This has an impact on latency as it is not true streaming, but it avoids some opaque race conditions that can appear in a 'lazy' implementation.
I'd like to get some feedback on whether this approach is worthwhile or if it would be better to try and get a working 'lazy' implementation. My thinking is that, if latency is a concern, it would probably be best to point users towards the async version regardless, leaving the sync methods as conveniences for certain cases.