diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index b4e6cd69ef64a..55976d1c02269 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -67,6 +67,10 @@ + + + + diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 9638de4e49092..41a7e3438e0d9 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,6 +4,8 @@ CHANGELOG 4.3.0 ----- + * Added a new `SyncTransport` along with `ForceCallHandlersStamp` to + explicitly handle messages asynchronously. * Added optional parameter `prefetch_count` in connection configuration, to setup channel prefetch count * New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware` diff --git a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php index f1b561d03a18d..d2ebaf8cfa011 100644 --- a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php @@ -15,6 +15,7 @@ use Psr\Log\NullLogger; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; +use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; @@ -81,7 +82,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope $envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null))); } - // on a redelivery, never call local handlers + // if the message was marked (usually by SyncTransport) that it handlers + // MUST be called, mark them to be handled. + if (null !== $envelope->last(ForceCallHandlersStamp::class)) { + $handle = true; + } + + // on a redelivery, only send back to queue: never call local handlers if (null !== $redeliveryStamp) { $handle = false; } diff --git a/src/Symfony/Component/Messenger/Stamp/ForceCallHandlersStamp.php b/src/Symfony/Component/Messenger/Stamp/ForceCallHandlersStamp.php new file mode 100644 index 0000000000000..5c9eb3a6d96dd --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/ForceCallHandlersStamp.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Stamp marks that the handlers *should* be called immediately. + * + * This is used by the SyncTransport to indicate to the + * SendMessageMiddleware that handlers *should* be called + * immediately, even though a transport was set. + * + * @experimental in 4.3 + * + * @author Ryan Weaver + */ +class ForceCallHandlersStamp implements StampInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php index 301f01b86086b..92e48bc78204d 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php @@ -14,6 +14,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; +use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; @@ -86,6 +87,8 @@ public function testItSendsTheMessageToMultipleSenders() public function testItSendsToOnlyOneSenderOnRedelivery() { $envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar')); + // even with a ForceCallHandlersStamp, the next middleware won't be called + $envelope = $envelope->with(new ForceCallHandlersStamp()); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); @@ -237,7 +240,7 @@ public function testItDoesNotDispatchWithNoSenders() $middleware->handle($envelope, $this->getStackMock()); } - public function testItDoesNotDispatchOnRetry() + public function testItDoesNotDispatchOnRedeliver() { $envelope = new Envelope(new DummyMessage('original envelope')); $envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender')); @@ -251,4 +254,18 @@ public function testItDoesNotDispatchOnRetry() $middleware->handle($envelope, $this->getStackMock(false)); } + + public function testItHandlesWithForceCallHandlersStamp() + { + $envelope = new Envelope(new DummyMessage('original envelope')); + $envelope = $envelope->with(new ForceCallHandlersStamp()); + + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sender->expects($this->once())->method('send')->willReturn($envelope); + + $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]])); + + // next handler *should* be called + $middleware->handle($envelope, $this->getStackMock(true)); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php new file mode 100644 index 0000000000000..6d45e7f6df34c --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php @@ -0,0 +1,52 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Sync; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * A "fake" transport that marks messages to be handled immediately. + * + * @experimental in 4.3 + * + * @author Ryan Weaver + */ +class SyncTransport implements TransportInterface +{ + public function receive(callable $handler): void + { + throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.'); + } + + public function stop(): void + { + throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.'); + } + + public function ack(Envelope $envelope): void + { + throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.'); + } + + public function reject(Envelope $envelope): void + { + throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.'); + } + + public function send(Envelope $envelope): Envelope + { + return $envelope->with(new ForceCallHandlersStamp()); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php new file mode 100644 index 0000000000000..0eba740813a56 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Sync; + +use Symfony\Component\Messenger\Transport\TransportFactoryInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @experimental in 4.3 + * + * @author Ryan Weaver + */ +class SyncTransportFactory implements TransportFactoryInterface +{ + public function createTransport(string $dsn, array $options): TransportInterface + { + return new SyncTransport(); + } + + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'sync://'); + } +}