Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 903355f

Browse filesBrowse files
Nyholmogizanagi
andcommitted
Support for handling messages after current bus is finished
Co-authored-by: Maxime Steinhausser <ogizanagi@users.noreply.github.com>
1 parent 2ff8c19 commit 903355f
Copy full SHA for 903355f

File tree

8 files changed

+368
-2
lines changed
Filter options

8 files changed

+368
-2
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1608,7 +1608,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16081608
}
16091609

16101610
$defaultMiddleware = [
1611-
'before' => [],
1611+
'before' => [['id' => 'dispatch_after_current_bus']],
16121612
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
16131613
];
16141614
foreach ($config['buses'] as $busId => $bus) {

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
</call>
4040
</service>
4141

42+
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
43+
4244
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
4345
<argument type="service" id="validator" />
4446
</service>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,12 +702,14 @@ public function testMessengerWithMultipleBuses()
702702
$this->assertTrue($container->has('messenger.bus.commands'));
703703
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
704704
$this->assertEquals([
705+
['id' => 'dispatch_after_current_bus'],
705706
['id' => 'send_message'],
706707
['id' => 'handle_message'],
707708
], $container->getParameter('messenger.bus.commands.middleware'));
708709
$this->assertTrue($container->has('messenger.bus.events'));
709710
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
710711
$this->assertEquals([
712+
['id' => 'dispatch_after_current_bus'],
711713
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
712714
['id' => 'send_message'],
713715
['id' => 'handle_message'],

‎src/Symfony/Bundle/FrameworkBundle/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/composer.json
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"symfony/form": "^4.3",
4444
"symfony/expression-language": "~3.4|~4.0",
4545
"symfony/http-client": "^4.3",
46-
"symfony/messenger": "^4.2",
46+
"symfony/messenger": "^4.3",
4747
"symfony/mime": "^4.3",
4848
"symfony/process": "~3.4|~4.0",
4949
"symfony/security-core": "~3.4|~4.0",
+48Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* When handling queued messages from {@link DispatchAfterCurrentBusMiddleware},
16+
* some handlers caused an exception. This exception contains all those handler exceptions.
17+
*
18+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
19+
*/
20+
class DelayedMessageHandlingException extends \RuntimeException implements ExceptionInterface
21+
{
22+
private $exceptions;
23+
24+
public function __construct(array $exceptions)
25+
{
26+
$exceptionMessages = implode(", \n", array_map(
27+
function (\Throwable $e) {
28+
return \get_class($e).': '.$e->getMessage();
29+
},
30+
$exceptions
31+
));
32+
33+
if (1 === \count($exceptions)) {
34+
$message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
35+
} else {
36+
$message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
37+
}
38+
39+
$this->exceptions = $exceptions;
40+
41+
parent::__construct($message, 0, $exceptions[0]);
42+
}
43+
44+
public function getExceptions(): array
45+
{
46+
return $this->exceptions;
47+
}
48+
}
+128Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
16+
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
17+
18+
/**
19+
* Allow to configure messages to be handled after the current bus is finished.
20+
*
21+
* I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
22+
* will actually be handled once the current message being dispatched is fully
23+
* handled.
24+
*
25+
* For instance, using this middleware before the DoctrineTransactionMiddleware
26+
* means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
27+
* handled after the Doctrine transaction has been committed.
28+
*
29+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
30+
*/
31+
class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
32+
{
33+
/**
34+
* @var QueuedEnvelope[] A queue of messages and next middleware
35+
*/
36+
private $queue = [];
37+
38+
/**
39+
* @var bool this property is used to signal if we are inside a the first/root call to
40+
* MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
41+
*/
42+
private $isRootDispatchCallRunning = false;
43+
44+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
45+
{
46+
if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
47+
if (!$this->isRootDispatchCallRunning) {
48+
throw new \LogicException(sprintf('You can only use a "%s" stamp in the context of a message handler.', DispatchAfterCurrentBusStamp::class));
49+
}
50+
$this->queue[] = new QueuedEnvelope($envelope, $stack);
51+
52+
return $envelope;
53+
}
54+
55+
if ($this->isRootDispatchCallRunning) {
56+
/*
57+
* A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
58+
* but the message does not have the stamp. So, process it like normal.
59+
*/
60+
return $stack->next()->handle($envelope, $stack);
61+
}
62+
63+
// First time we get here, mark as inside a "root dispatch" call:
64+
$this->isRootDispatchCallRunning = true;
65+
try {
66+
// Execute the whole middleware stack & message handling for main dispatch:
67+
$returnedEnvelope = $stack->next()->handle($envelope, $stack);
68+
} catch (\Throwable $exception) {
69+
/*
70+
* Whenever an exception occurs while handling a message that has
71+
* queued other messages, we drop the queued ones.
72+
* This is intentional since the queued commands were likely dependent
73+
* on the preceding command.
74+
*/
75+
$this->queue = [];
76+
$this->isRootDispatchCallRunning = false;
77+
78+
throw $exception;
79+
}
80+
81+
// "Root dispatch" call is finished, dispatch stored messages.
82+
$exceptions = [];
83+
while (null !== $queueItem = array_shift($this->queue)) {
84+
try {
85+
// Execute the stored messages
86+
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
87+
} catch (\Exception $exception) {
88+
// Gather all exceptions
89+
$exceptions[] = $exception;
90+
}
91+
}
92+
93+
$this->isRootDispatchCallRunning = false;
94+
if (\count($exceptions) > 0) {
95+
throw new DelayedMessageHandlingException($exceptions);
96+
}
97+
98+
return $returnedEnvelope;
99+
}
100+
}
101+
102+
/**
103+
* @internal
104+
*/
105+
final class QueuedEnvelope
106+
{
107+
/** @var Envelope */
108+
private $envelope;
109+
110+
/** @var StackInterface */
111+
private $stack;
112+
113+
public function __construct(Envelope $envelope, StackInterface $stack)
114+
{
115+
$this->envelope = $envelope;
116+
$this->stack = $stack;
117+
}
118+
119+
public function getEnvelope(): Envelope
120+
{
121+
return $this->envelope;
122+
}
123+
124+
public function getStack(): StackInterface
125+
{
126+
return $this->stack;
127+
}
128+
}
+25Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Symfony\Component\Messenger\Stamp;
15+
16+
/**
17+
* Marker item to tell this message should be handled in after the current bus has finished.
18+
*
19+
* @see \Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware
20+
*
21+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
22+
*/
23+
class DispatchAfterCurrentBusStamp implements StampInterface
24+
{
25+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.