diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index 673c89f9fe16a..ef6a2147e36f5 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -1608,7 +1608,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
}
$defaultMiddleware = [
- 'before' => [],
+ 'before' => [['id' => 'dispatch_after_current_bus']],
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
];
foreach ($config['buses'] as $busId => $bus) {
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 5e3c042f4752e..d50d13bde287f 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -39,6 +39,8 @@
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index 6a98f7c1841ee..0c0b5cc6c3333 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -702,12 +702,14 @@ public function testMessengerWithMultipleBuses()
$this->assertTrue($container->has('messenger.bus.commands'));
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
+ ['id' => 'dispatch_after_current_bus'],
['id' => 'send_message'],
['id' => 'handle_message'],
], $container->getParameter('messenger.bus.commands.middleware'));
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
+ ['id' => 'dispatch_after_current_bus'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
['id' => 'send_message'],
['id' => 'handle_message'],
diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json
index 06d16c65d46fd..f065dafc66adb 100644
--- a/src/Symfony/Bundle/FrameworkBundle/composer.json
+++ b/src/Symfony/Bundle/FrameworkBundle/composer.json
@@ -43,7 +43,7 @@
"symfony/form": "^4.3",
"symfony/expression-language": "~3.4|~4.0",
"symfony/http-client": "^4.3",
- "symfony/messenger": "^4.2",
+ "symfony/messenger": "^4.3",
"symfony/mime": "^4.3",
"symfony/process": "~3.4|~4.0",
"symfony/security-core": "~3.4|~4.0",
diff --git a/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php b/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php
new file mode 100644
index 0000000000000..313d6f672c123
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php
@@ -0,0 +1,48 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Exception;
+
+/**
+ * When handling queued messages from {@link DispatchAfterCurrentBusMiddleware},
+ * some handlers caused an exception. This exception contains all those handler exceptions.
+ *
+ * @author Tobias Nyholm
+ */
+class DelayedMessageHandlingException extends \RuntimeException implements ExceptionInterface
+{
+ private $exceptions;
+
+ public function __construct(array $exceptions)
+ {
+ $exceptionMessages = implode(", \n", array_map(
+ function (\Throwable $e) {
+ return \get_class($e).': '.$e->getMessage();
+ },
+ $exceptions
+ ));
+
+ if (1 === \count($exceptions)) {
+ $message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
+ } else {
+ $message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
+ }
+
+ $this->exceptions = $exceptions;
+
+ parent::__construct($message, 0, $exceptions[0]);
+ }
+
+ public function getExceptions(): array
+ {
+ return $this->exceptions;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php
new file mode 100644
index 0000000000000..4c098c79b7281
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php
@@ -0,0 +1,128 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Middleware;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
+use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
+
+/**
+ * Allow to configure messages to be handled after the current bus is finished.
+ *
+ * I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
+ * will actually be handled once the current message being dispatched is fully
+ * handled.
+ *
+ * For instance, using this middleware before the DoctrineTransactionMiddleware
+ * means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
+ * handled after the Doctrine transaction has been committed.
+ *
+ * @author Tobias Nyholm
+ */
+class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
+{
+ /**
+ * @var QueuedEnvelope[] A queue of messages and next middleware
+ */
+ private $queue = [];
+
+ /**
+ * @var bool this property is used to signal if we are inside a the first/root call to
+ * MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
+ */
+ private $isRootDispatchCallRunning = false;
+
+ public function handle(Envelope $envelope, StackInterface $stack): Envelope
+ {
+ if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
+ if (!$this->isRootDispatchCallRunning) {
+ throw new \LogicException(sprintf('You can only use a "%s" stamp in the context of a message handler.', DispatchAfterCurrentBusStamp::class));
+ }
+ $this->queue[] = new QueuedEnvelope($envelope, $stack);
+
+ return $envelope;
+ }
+
+ if ($this->isRootDispatchCallRunning) {
+ /*
+ * A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
+ * but the message does not have the stamp. So, process it like normal.
+ */
+ return $stack->next()->handle($envelope, $stack);
+ }
+
+ // First time we get here, mark as inside a "root dispatch" call:
+ $this->isRootDispatchCallRunning = true;
+ try {
+ // Execute the whole middleware stack & message handling for main dispatch:
+ $returnedEnvelope = $stack->next()->handle($envelope, $stack);
+ } catch (\Throwable $exception) {
+ /*
+ * Whenever an exception occurs while handling a message that has
+ * queued other messages, we drop the queued ones.
+ * This is intentional since the queued commands were likely dependent
+ * on the preceding command.
+ */
+ $this->queue = [];
+ $this->isRootDispatchCallRunning = false;
+
+ throw $exception;
+ }
+
+ // "Root dispatch" call is finished, dispatch stored messages.
+ $exceptions = [];
+ while (null !== $queueItem = array_shift($this->queue)) {
+ try {
+ // Execute the stored messages
+ $queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
+ } catch (\Exception $exception) {
+ // Gather all exceptions
+ $exceptions[] = $exception;
+ }
+ }
+
+ $this->isRootDispatchCallRunning = false;
+ if (\count($exceptions) > 0) {
+ throw new DelayedMessageHandlingException($exceptions);
+ }
+
+ return $returnedEnvelope;
+ }
+}
+
+/**
+ * @internal
+ */
+final class QueuedEnvelope
+{
+ /** @var Envelope */
+ private $envelope;
+
+ /** @var StackInterface */
+ private $stack;
+
+ public function __construct(Envelope $envelope, StackInterface $stack)
+ {
+ $this->envelope = $envelope;
+ $this->stack = $stack;
+ }
+
+ public function getEnvelope(): Envelope
+ {
+ return $this->envelope;
+ }
+
+ public function getStack(): StackInterface
+ {
+ return $this->stack;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Stamp/DispatchAfterCurrentBusStamp.php b/src/Symfony/Component/Messenger/Stamp/DispatchAfterCurrentBusStamp.php
new file mode 100644
index 0000000000000..38222cbc3b76e
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Stamp/DispatchAfterCurrentBusStamp.php
@@ -0,0 +1,25 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+declare(strict_types=1);
+
+namespace Symfony\Component\Messenger\Stamp;
+
+/**
+ * Marker item to tell this message should be handled in after the current bus has finished.
+ *
+ * @see \Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware
+ *
+ * @author Tobias Nyholm
+ */
+class DispatchAfterCurrentBusStamp implements StampInterface
+{
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php
new file mode 100644
index 0000000000000..6b6d99c4e11fb
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Middleware/DispatchAfterCurrentBusMiddlewareTest.php
@@ -0,0 +1,161 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Middleware;
+
+use PHPUnit\Framework\MockObject\MockObject;
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
+use Symfony\Component\Messenger\MessageBus;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
+use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
+use Symfony\Component\Messenger\Middleware\StackInterface;
+use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+
+class DispatchAfterCurrentBusMiddlewareTest extends TestCase
+{
+ public function testEventsInNewTransactionAreHandledAfterMainMessage()
+ {
+ $message = new DummyMessage('Hello');
+
+ $firstEvent = new DummyEvent('First event');
+ $secondEvent = new DummyEvent('Second event');
+ $thirdEvent = new DummyEvent('Third event');
+
+ $middleware = new DispatchAfterCurrentBusMiddleware();
+ $handlingMiddleware = $this->createMock(MiddlewareInterface::class);
+
+ $eventBus = new MessageBus([
+ $middleware,
+ $handlingMiddleware,
+ ]);
+
+ $messageBus = new MessageBus([
+ $middleware,
+ new DispatchingMiddleware($eventBus, [
+ new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
+ new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
+ $thirdEvent, // Not in a new transaction
+ ]),
+ $handlingMiddleware,
+ ]);
+
+ // Third event is dispatch within main dispatch, but before its handling:
+ $this->expectHandledMessage($handlingMiddleware, 0, $thirdEvent);
+ // Then expect main dispatched message to be handled first:
+ $this->expectHandledMessage($handlingMiddleware, 1, $message);
+ // Then, expect events in new transaction to be handled next, in dispatched order:
+ $this->expectHandledMessage($handlingMiddleware, 2, $firstEvent);
+ $this->expectHandledMessage($handlingMiddleware, 3, $secondEvent);
+
+ $messageBus->dispatch($message);
+ }
+
+ public function testThrowingEventsHandlingWontStopExecution()
+ {
+ $message = new DummyMessage('Hello');
+
+ $firstEvent = new DummyEvent('First event');
+ $secondEvent = new DummyEvent('Second event');
+
+ $middleware = new DispatchAfterCurrentBusMiddleware();
+ $handlingMiddleware = $this->createMock(MiddlewareInterface::class);
+
+ $eventBus = new MessageBus([
+ $middleware,
+ $handlingMiddleware,
+ ]);
+
+ $messageBus = new MessageBus([
+ $middleware,
+ new DispatchingMiddleware($eventBus, [
+ new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
+ new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
+ ]),
+ $handlingMiddleware,
+ ]);
+
+ // Expect main dispatched message to be handled first:
+ $this->expectHandledMessage($handlingMiddleware, 0, $message);
+ // Then, expect events in new transaction to be handled next, in dispatched order:
+ $this->expectThrowingHandling($handlingMiddleware, 1, $firstEvent, new \RuntimeException('Some exception while handling first event'));
+ // Next event is still handled despite the previous exception:
+ $this->expectHandledMessage($handlingMiddleware, 2, $secondEvent);
+
+ $this->expectException(DelayedMessageHandlingException::class);
+ $this->expectExceptionMessage('RuntimeException: Some exception while handling first event');
+
+ $messageBus->dispatch($message);
+ }
+
+ /**
+ * @param MiddlewareInterface|MockObject $handlingMiddleware
+ */
+ private function expectHandledMessage(MiddlewareInterface $handlingMiddleware, int $at, $message): void
+ {
+ $handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
+ return $envelope->getMessage() === $message;
+ }))->willReturnCallback(function ($envelope, StackInterface $stack) {
+ return $stack->next()->handle($envelope, $stack);
+ });
+ }
+
+ /**
+ * @param MiddlewareInterface|MockObject $handlingMiddleware
+ */
+ private function expectThrowingHandling(MiddlewareInterface $handlingMiddleware, int $at, $message, \Throwable $throwable): void
+ {
+ $handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
+ return $envelope->getMessage() === $message;
+ }))->willReturnCallback(function () use ($throwable) {
+ throw $throwable;
+ });
+ }
+}
+
+class DummyEvent
+{
+ private $message;
+
+ public function __construct(string $message)
+ {
+ $this->message = $message;
+ }
+
+ public function getMessage(): string
+ {
+ return $this->message;
+ }
+}
+
+class DispatchingMiddleware implements MiddlewareInterface
+{
+ private $bus;
+ private $messages;
+
+ public function __construct(MessageBusInterface $bus, array $messages)
+ {
+ $this->bus = $bus;
+ $this->messages = $messages;
+ }
+
+ public function handle(Envelope $envelope, StackInterface $stack): Envelope
+ {
+ foreach ($this->messages as $event) {
+ $this->bus->dispatch($event);
+ }
+
+ return $stack->next()->handle($envelope, $stack);
+ }
+}