-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] implementation of messenger:consume
, which processes messages concurrently
#53964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 7.3
Are you sure you want to change the base?
Conversation
ea02a2e
to
8cf4f65
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick review, I like it
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
Outdated
Show resolved
Hide resolved
8b8e52c
to
66200db
Compare
c1d082a
to
89b22b9
Compare
src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
Outdated
Show resolved
Hide resolved
89b22b9
to
885c278
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely done! Just a few questions/comments.
src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
Outdated
Show resolved
Hide resolved
8fac51d
to
f35b5ad
Compare
$envelope = Envelope::wrap($message, $stamps); | ||
$task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir); | ||
|
||
$future = async(function () use ($worker, $task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use static function
here to avoid holding a reference to $this
.
{ | ||
} | ||
|
||
public function getFuture(): Future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This leaks the amp/future
directly, which is somewhat tricky to use. Ideally, we would just allow getting the value (or waiting for it) instead of leaking the internal implementation.
|
||
public function getLogDir(): string | ||
{ | ||
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs'; | |
return sys_get_temp_dir().DIRECTORY_SEPARATOR.Kernel::VERSION.DIRECTORY_SEPARATOR.'EmptyAppKernel'.DIRECTORY_SEPARATOR.'logs'; |
and others in this file?
|
||
foreach ($errorsFromAwait as $index => $error) { | ||
try { | ||
$execution = $futuresReceived[$index]->await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we already do this on line 347?
$this->preAck($envelope, $transportName, false, null); | ||
} | ||
|
||
$futures = array_map(fn ($execution) => $execution->getFuture(), $executions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be great if we could factor out the multiple layers of futures.
|
||
$i = 0; | ||
|
||
while ($executions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we can take advantage of some amp stuff to make this much more readable.
See: Future::iterate()
, for example. I find this entire loop quite confusing. We are iterating over $futures
, while also maintaining a copy in $executions
. I'm not 100% sure any of this is actually doing anything in parallel.
For example, we set $i = 0, then wait for something to complete, then filter out the completed future from $executions
, then reset $futures
, then increment i
, then wait for all executions to complete, then start all over again.
Like, I'm not sure what the intent is here; some comments would be helpful to explain why you are doing what you are doing. To be frank, it doesn't seem to make any sense at all.
First, it appears that you want to set a processing limit, but all these fibers/processes are already running. You can't set the parallel processing limit this late in the code. Instead, you should set it in src/Symfony/Component/Messenger/ParallelMessageBus.php:38
when you set up the worker pool. That would be where you set the limits by defining how many threads / fibers it will run concurrently. Doing this here doesn't do anything.
Secondly, the entire code can be written differently to take advantage of amp's parallelism.
$futures = self::$futures; // take a copy of futures we care about
self::$futures = []; // any other fiber shouldn't watch these fibers
// loop over all tasks in the background.
async(
function () use ($futures)
{
// iterate over futures in completion order
foreach(Future::iterate($futures) as $idx => $future) {
try {
$val = $future->await();
// do stuff with val
} catch (\Throwable $ex) {
// handle error of $futures[$idx] here.
}
}
}
)
Edit to add:
See Future::map()
to attach a callback on completion. Here's a queue handler I wrote awhile ago using amphp that could handle ~5k jobs per second: https://github.com/bottledcode/durable-php/blob/main/src/Processor.php#L150-L185
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your feedback.
Instead of calling awaitFirst
, I could implement the internal logic of this function in the worker code. This way, I would have access to the index of the Future, which would make it easier to unset and sort what is done or still pending.
But it's wrap in an Execution
class so it would be something like
foreach(Future::iterate($futuresReceived) as $idx => $future) {
try {
$execution = $future->await();
$execution->getFuture()->await();
$envelope = $execution->getFuture()->await();
unset($futuresReceived[$idx]);
$this->preAck($envelope, $transportName, false, null);
} catch (\Throwable $ex) {
}
```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the action, when using the parallel message bus the messages seem to be processed concurrently rather than in order. I'll add a small screenshot to show the result compared to a "normal" bus
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the action, when using the parallel message bus the messages seem to be processed concurrently rather than in order. I'll add a small screenshot to show the result compared to a "normal" bus
Yes, the bus is processing concurrently, but you were not checking them concurrently nor setting limits on concurrency.
messenger:consume
, which processes messages concurrently
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php
Outdated
Show resolved
Hide resolved
e6b8863
to
5d6bae6
Compare
f976458
to
52e9176
Compare
52e9176
to
522a316
Compare
@@ -282,4 +329,37 @@ public function getMetadata(): WorkerMetadata | ||
{ | ||
return $this->metadata; | ||
} | ||
|
||
public function handleFutures(string $transportName, $parallelProcessLimit): void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$parallelProcessLimit
seems unused
@IndraGunawan I'd love to, but I can't decide that on my own 😅 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when conflicts will be resolved
It's relevant. BatchHandler will handle processing data synchronously. If I understood well, do you suggest it miss async capabilities? Somehow updating BatchHandler to make it async aware or declaring another BatchAsyncHandler trait could allow processing batch messaging with async mechanisms. |
The purpose of this PR is to enable concurrent handling of Symfony Messenger messages. This PR utilizes the amphp/parallel library to achieve this. During the worker configuration, it is possible to define how many processes should be batched in parallel before flushing, meaning accessing the result, acknowledging, or initiating executions in case of errors.
In the child process, a container is cached. Therefore, in each child process, it will be possible to inject services and make requests, etc.
In this initial use case, the decision was made not to reuse the parent connection, such as the Doctrine connection. This is because it could currently be inconvenient for users, as they would need to modify their handlers, which could be cumbersome.
Even without reusing the parent connection, there is a performance gain since operations are performed concurrently for x number of processes. Therefore, even if there is a handler with blocking code, it does not prevent other child processes from proceeding.
for example, in the case we have 2 handlers and one of the handlers has a 4-second pause, and we have, for instance, 40 messages processed concurrently with a batch size of 10 using the ParallelBus, it's approximately 6 times faster between handling the first message and the last message (This observation was made during a test conducted in our development environment)
In order to dispatch messages concurrently, it is necessary to first consider the ParallelMessageBus:
! It works for async mode - It doesn't work with BatchHandler Trait !
Async mode:
You can specify how many processes will be executed in parallel.
It will define how many processes will be launched - threaded to then complete them one by one and get the return of the message processing
By default: 10
TODO:
This feature is being developed by @coopTilleuls and @TradersPost and it has been designed with @jwage and @dunglas