diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index b4e6cd69ef64a..55976d1c02269 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -67,6 +67,10 @@
+
+
+
+
diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 9638de4e49092..41a7e3438e0d9 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----
+ * Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
+ explicitly handle messages asynchronously.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
diff --git a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
index f1b561d03a18d..d2ebaf8cfa011 100644
--- a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
+++ b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
@@ -15,6 +15,7 @@
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
+use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -81,7 +82,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
}
- // on a redelivery, never call local handlers
+ // if the message was marked (usually by SyncTransport) that it handlers
+ // MUST be called, mark them to be handled.
+ if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
+ $handle = true;
+ }
+
+ // on a redelivery, only send back to queue: never call local handlers
if (null !== $redeliveryStamp) {
$handle = false;
}
diff --git a/src/Symfony/Component/Messenger/Stamp/ForceCallHandlersStamp.php b/src/Symfony/Component/Messenger/Stamp/ForceCallHandlersStamp.php
new file mode 100644
index 0000000000000..5c9eb3a6d96dd
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Stamp/ForceCallHandlersStamp.php
@@ -0,0 +1,27 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Stamp;
+
+/**
+ * Stamp marks that the handlers *should* be called immediately.
+ *
+ * This is used by the SyncTransport to indicate to the
+ * SendMessageMiddleware that handlers *should* be called
+ * immediately, even though a transport was set.
+ *
+ * @experimental in 4.3
+ *
+ * @author Ryan Weaver
+ */
+class ForceCallHandlersStamp implements StampInterface
+{
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
index 301f01b86086b..92e48bc78204d 100644
--- a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
@@ -14,6 +14,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
+use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -86,6 +87,8 @@ public function testItSendsTheMessageToMultipleSenders()
public function testItSendsToOnlyOneSenderOnRedelivery()
{
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
+ // even with a ForceCallHandlersStamp, the next middleware won't be called
+ $envelope = $envelope->with(new ForceCallHandlersStamp());
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
@@ -237,7 +240,7 @@ public function testItDoesNotDispatchWithNoSenders()
$middleware->handle($envelope, $this->getStackMock());
}
- public function testItDoesNotDispatchOnRetry()
+ public function testItDoesNotDispatchOnRedeliver()
{
$envelope = new Envelope(new DummyMessage('original envelope'));
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
@@ -251,4 +254,18 @@ public function testItDoesNotDispatchOnRetry()
$middleware->handle($envelope, $this->getStackMock(false));
}
+
+ public function testItHandlesWithForceCallHandlersStamp()
+ {
+ $envelope = new Envelope(new DummyMessage('original envelope'));
+ $envelope = $envelope->with(new ForceCallHandlersStamp());
+
+ $sender = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $sender->expects($this->once())->method('send')->willReturn($envelope);
+
+ $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
+
+ // next handler *should* be called
+ $middleware->handle($envelope, $this->getStackMock(true));
+ }
}
diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php
new file mode 100644
index 0000000000000..6d45e7f6df34c
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php
@@ -0,0 +1,52 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\Sync;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\InvalidArgumentException;
+use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+/**
+ * A "fake" transport that marks messages to be handled immediately.
+ *
+ * @experimental in 4.3
+ *
+ * @author Ryan Weaver
+ */
+class SyncTransport implements TransportInterface
+{
+ public function receive(callable $handler): void
+ {
+ throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.');
+ }
+
+ public function stop(): void
+ {
+ throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.');
+ }
+
+ public function ack(Envelope $envelope): void
+ {
+ throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.');
+ }
+
+ public function reject(Envelope $envelope): void
+ {
+ throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.');
+ }
+
+ public function send(Envelope $envelope): Envelope
+ {
+ return $envelope->with(new ForceCallHandlersStamp());
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php
new file mode 100644
index 0000000000000..0eba740813a56
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php
@@ -0,0 +1,33 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\Sync;
+
+use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+/**
+ * @experimental in 4.3
+ *
+ * @author Ryan Weaver
+ */
+class SyncTransportFactory implements TransportFactoryInterface
+{
+ public function createTransport(string $dsn, array $options): TransportInterface
+ {
+ return new SyncTransport();
+ }
+
+ public function supports(string $dsn, array $options): bool
+ {
+ return 0 === strpos($dsn, 'sync://');
+ }
+}