Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] implementation of messenger:consume, which processes messages concurrently #53964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 7.3
Choose a base branch
Loading
from

Conversation

alli83
Copy link
Contributor

@alli83 alli83 commented Feb 16, 2024

Q A
Branch? 7.3
Bug fix? no
New feature? yes
Deprecations? no
Issues
License MIT

The purpose of this PR is to enable concurrent handling of Symfony Messenger messages. This PR utilizes the amphp/parallel library to achieve this. During the worker configuration, it is possible to define how many processes should be batched in parallel before flushing, meaning accessing the result, acknowledging, or initiating executions in case of errors.

In the child process, a container is cached. Therefore, in each child process, it will be possible to inject services and make requests, etc.

In this initial use case, the decision was made not to reuse the parent connection, such as the Doctrine connection. This is because it could currently be inconvenient for users, as they would need to modify their handlers, which could be cumbersome.

Even without reusing the parent connection, there is a performance gain since operations are performed concurrently for x number of processes. Therefore, even if there is a handler with blocking code, it does not prevent other child processes from proceeding.
for example, in the case we have 2 handlers and one of the handlers has a 4-second pause, and we have, for instance, 40 messages processed concurrently with a batch size of 10 using the ParallelBus, it's approximately 6 times faster between handling the first message and the last message (This observation was made during a test conducted in our development environment)

In order to dispatch messages concurrently, it is necessary to first consider the ParallelMessageBus:
! It works for async mode - It doesn't work with BatchHandler Trait !

Async mode:

$this->bus->dispatch(new UserMessage($user->getId()), [new BusNameStamp('parallel_bus')]);

You can specify how many processes will be executed in parallel.
It will define how many processes will be launched - threaded to then complete them one by one and get the return of the message processing
By default: 10

php bin/console messenger:consume async -p 20

TODO:

  • CHANGELOG:

This feature is being developed by @coopTilleuls and @TradersPost and it has been designed with @jwage and @dunglas

@carsonbot carsonbot added this to the 7.1 milestone Feb 16, 2024
@carsonbot carsonbot changed the title [Messenger] implementation of messenger:consume, which processes messages concurrently [Messenger] implementation of messenger:consume, which processes messages concurrently Feb 16, 2024
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from ea02a2e to 8cf4f65 Compare February 16, 2024 08:03
Copy link
Member

@lyrixx lyrixx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick review, I like it

@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 2 times, most recently from 8b8e52c to 66200db Compare February 16, 2024 08:16
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 3 times, most recently from c1d082a to 89b22b9 Compare February 16, 2024 08:42
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from 89b22b9 to 885c278 Compare February 16, 2024 13:36
Copy link
Contributor

@jwage jwage left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done! Just a few questions/comments.

@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 11 times, most recently from 8fac51d to f35b5ad Compare February 25, 2024 22:34
$envelope = Envelope::wrap($message, $stamps);
$task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir);

$future = async(function () use ($worker, $task) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use static function here to avoid holding a reference to $this.

{
}

public function getFuture(): Future

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks the amp/future directly, which is somewhat tricky to use. Ideally, we would just allow getting the value (or waiting for it) instead of leaking the internal implementation.


public function getLogDir(): string
{
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs';

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs';
return sys_get_temp_dir().DIRECTORY_SEPARATOR.Kernel::VERSION.DIRECTORY_SEPARATOR.'EmptyAppKernel'.DIRECTORY_SEPARATOR.'logs';

and others in this file?

src/Symfony/Component/Messenger/Worker.php Outdated Show resolved Hide resolved

foreach ($errorsFromAwait as $index => $error) {
try {
$execution = $futuresReceived[$index]->await();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we already do this on line 347?

src/Symfony/Component/Messenger/Worker.php Outdated Show resolved Hide resolved
$this->preAck($envelope, $transportName, false, null);
}

$futures = array_map(fn ($execution) => $execution->getFuture(), $executions);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great if we could factor out the multiple layers of futures.


$i = 0;

while ($executions) {
Copy link

@withinboredom withinboredom Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we can take advantage of some amp stuff to make this much more readable.

See: Future::iterate(), for example. I find this entire loop quite confusing. We are iterating over $futures, while also maintaining a copy in $executions. I'm not 100% sure any of this is actually doing anything in parallel.

For example, we set $i = 0, then wait for something to complete, then filter out the completed future from $executions, then reset $futures, then increment i, then wait for all executions to complete, then start all over again.

Like, I'm not sure what the intent is here; some comments would be helpful to explain why you are doing what you are doing. To be frank, it doesn't seem to make any sense at all.

First, it appears that you want to set a processing limit, but all these fibers/processes are already running. You can't set the parallel processing limit this late in the code. Instead, you should set it in src/Symfony/Component/Messenger/ParallelMessageBus.php:38 when you set up the worker pool. That would be where you set the limits by defining how many threads / fibers it will run concurrently. Doing this here doesn't do anything.

Secondly, the entire code can be written differently to take advantage of amp's parallelism.

$futures = self::$futures; // take a copy of futures we care about
self::$futures = []; // any other fiber shouldn't watch these fibers

// loop over all tasks in the background.
async(
  function () use ($futures)
  {
    // iterate over futures in completion order
    foreach(Future::iterate($futures) as $idx => $future) {
      try {
        $val = $future->await();
        // do stuff with val
      } catch (\Throwable $ex) {
        // handle error of $futures[$idx] here.
      }
    }
  }
)

Edit to add:

See Future::map() to attach a callback on completion. Here's a queue handler I wrote awhile ago using amphp that could handle ~5k jobs per second: https://github.com/bottledcode/durable-php/blob/main/src/Processor.php#L150-L185

Copy link
Contributor Author

@alli83 alli83 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your feedback.
Instead of calling awaitFirst, I could implement the internal logic of this function in the worker code. This way, I would have access to the index of the Future, which would make it easier to unset and sort what is done or still pending.
But it's wrap in an Executionclass so it would be something like

foreach(Future::iterate($futuresReceived) as $idx => $future) {
            try {
                $execution = $future->await();
                $execution->getFuture()->await();
                $envelope = $execution->getFuture()->await();

                unset($futuresReceived[$idx]);

                $this->preAck($envelope, $transportName, false, null);
            } catch (\Throwable $ex) {
            }
            ```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the action, when using the parallel message bus the messages seem to be processed concurrently rather than in order. I'll add a small screenshot to show the result compared to a "normal" bus

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the action, when using the parallel message bus the messages seem to be processed concurrently rather than in order. I'll add a small screenshot to show the result compared to a "normal" bus

Yes, the bus is processing concurrently, but you were not checking them concurrently nor setting limits on concurrency.

@OskarStark OskarStark changed the title [Messenger] implementation of messenger:consume, which processes messages concurrently [Messenger] implementation of messenger:consume, which processes messages concurrently Aug 16, 2024
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from e6b8863 to 5d6bae6 Compare August 17, 2024 06:24
@alli83 alli83 marked this pull request as draft August 19, 2024 13:53
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 5 times, most recently from f976458 to 52e9176 Compare August 20, 2024 01:15
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from 52e9176 to 522a316 Compare August 20, 2024 03:56
@alli83 alli83 marked this pull request as ready for review August 20, 2024 21:53
@@ -282,4 +329,37 @@ public function getMetadata(): WorkerMetadata
{
return $this->metadata;
}

public function handleFutures(string $transportName, $parallelProcessLimit): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$parallelProcessLimit seems unused

@IndraGunawan
Copy link
Contributor

IndraGunawan commented Oct 4, 2024

@dunglas @alli83 any chance this feature will be included in v7.2 release?

@dunglas
Copy link
Member

dunglas commented Oct 4, 2024

@IndraGunawan I'd love to, but I can't decide that on my own 😅

@fabpot fabpot modified the milestones: 7.2, 7.3 Nov 20, 2024
Copy link
Member

@dunglas dunglas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when conflicts will be resolved

@matyo91
Copy link
Contributor

matyo91 commented Mar 28, 2025

! It works for async mode - It doesn't work with BatchHandler Trait !

It's relevant. BatchHandler will handle processing data synchronously. If I understood well, do you suggest it miss async capabilities? Somehow updating BatchHandler to make it async aware or declaring another BatchAsyncHandler trait could allow processing batch messaging with async mechanisms.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Morty Proxy This is a proxified and sanitized view of the page, visit original site.