diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 0d994c5f8f8f0..ff6351e7078e5 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -65,6 +65,7 @@ use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Transport\SenderInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\PropertyAccess\PropertyAccessor; use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface; use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface; @@ -1506,19 +1507,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".'); } - $senderDefinition = (new Definition(SenderInterface::class)) - ->setFactory(array(new Reference('messenger.transport_factory'), 'createSender')) - ->setArguments(array($transport['dsn'], $transport['options'])) - ->addTag('messenger.sender', array('name' => $name)) - ; - $container->setDefinition('messenger.sender.'.$name, $senderDefinition); - - $receiverDefinition = (new Definition(ReceiverInterface::class)) - ->setFactory(array(new Reference('messenger.transport_factory'), 'createReceiver')) + $transportDefinition = (new Definition(TransportInterface::class)) + ->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport')) ->setArguments(array($transport['dsn'], $transport['options'])) ->addTag('messenger.receiver', array('name' => $name)) + ->addTag('messenger.sender', array('name' => $name)) ; - $container->setDefinition('messenger.receiver.'.$name, $receiverDefinition); + $container->setDefinition('messenger.transport.'.$name, $transportDefinition); } } diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index d7ce01d39fc21..eebf2b0f1b7f0 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -533,30 +533,20 @@ public function testMessenger() public function testMessengerTransports() { $container = $this->createContainerFromFile('messenger_transports'); - $this->assertTrue($container->hasDefinition('messenger.sender.default')); - $this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender')); - $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.sender.default')->getTag('messenger.sender')); - $this->assertTrue($container->hasDefinition('messenger.receiver.default')); - $this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver')); - $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.receiver.default')->getTag('messenger.receiver')); - - $this->assertTrue($container->hasDefinition('messenger.sender.customised')); - $senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory(); - $senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments(); - - $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createSender'), $senderFactory); - $this->assertCount(2, $senderArguments); - $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]); - $this->assertSame(array('queue' => array('name' => 'Queue')), $senderArguments[1]); - - $this->assertTrue($container->hasDefinition('messenger.receiver.customised')); - $receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory(); - $receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments(); - - $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createReceiver'), $receiverFactory); - $this->assertCount(2, $receiverArguments); - $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]); - $this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]); + $this->assertTrue($container->hasDefinition('messenger.transport.default')); + $this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver')); + $this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender')); + $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver')); + $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender')); + + $this->assertTrue($container->hasDefinition('messenger.transport.customised')); + $transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory(); + $transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments(); + + $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createTransport'), $transportFactory); + $this->assertCount(2, $transportArguments); + $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]); + $this->assertSame(array('queue' => array('name' => 'Queue')), $transportArguments[1]); $this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory')); } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 0c4bb18f31ca7..523c35db0f11b 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -33,7 +33,7 @@ public function __construct(EncoderInterface $messageEncoder, Connection $connec /** * {@inheritdoc} */ - public function send($message) + public function send($message): void { $encodedMessage = $this->messageEncoder->encode($message); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php new file mode 100644 index 0000000000000..165a795d71ab7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php @@ -0,0 +1,79 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\AmqpExt; + +use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; +use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Nicolas Grekas + */ +class AmqpTransport implements TransportInterface +{ + private $encoder; + private $decoder; + private $dsn; + private $options; + private $debug; + private $connection; + private $receiver; + private $sender; + + public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug) + { + $this->encoder = $encoder; + $this->decoder = $decoder; + $this->dsn = $dsn; + $this->options = $options; + $this->debug = $debug; + } + + /** + * {@inheritdoc} + */ + public function receive(callable $handler): void + { + ($this->receiver ?? $this->getReceiver())->receive($hander); + } + + /** + * {@inheritdoc} + */ + public function stop(): void + { + ($this->receiver ?? $this->getReceiver())->stop(); + } + + /** + * {@inheritdoc} + */ + public function send($message): void + { + ($this->sender ?? $this->getSender())->send($message); + } + + private function getReceiver() + { + return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection()); + } + + private function getSender() + { + return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection()); + } + + private function getConnection() + { + return $this->connection = new Connection($this->dsn, $this->options, $this->debug); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php index ddb385af52aef..2da18cb06692c 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php @@ -11,11 +11,10 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; -use Symfony\Component\Messenger\Transport\Factory\TransportFactoryInterface; -use Symfony\Component\Messenger\Transport\ReceiverInterface; -use Symfony\Component\Messenger\Transport\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; +use Symfony\Component\Messenger\Transport\TransportFactoryInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; /** * @author Samuel Roze @@ -33,14 +32,9 @@ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder $this->debug = $debug; } - public function createReceiver(string $dsn, array $options): ReceiverInterface + public function createTransport(string $dsn, array $options): TransportInterface { - return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug)); - } - - public function createSender(string $dsn, array $options): SenderInterface - { - return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug)); + return new AmqpTransport($this->encoder, $this->decoder, $dsn, $options, $thid->debug); } public function supports(string $dsn, array $options): bool diff --git a/src/Symfony/Component/Messenger/Transport/SenderInterface.php b/src/Symfony/Component/Messenger/Transport/SenderInterface.php index a142e1f00995e..93b1bd7cbaa3f 100644 --- a/src/Symfony/Component/Messenger/Transport/SenderInterface.php +++ b/src/Symfony/Component/Messenger/Transport/SenderInterface.php @@ -23,5 +23,5 @@ interface SenderInterface * * @param object $message */ - public function send($message); + public function send($message): void; } diff --git a/src/Symfony/Component/Messenger/Transport/Factory/ChainTransportFactory.php b/src/Symfony/Component/Messenger/Transport/TransportFactory.php similarity index 56% rename from src/Symfony/Component/Messenger/Transport/Factory/ChainTransportFactory.php rename to src/Symfony/Component/Messenger/Transport/TransportFactory.php index 779d365dc47e2..2c20533bfb8a9 100644 --- a/src/Symfony/Component/Messenger/Transport/Factory/ChainTransportFactory.php +++ b/src/Symfony/Component/Messenger/Transport/TransportFactory.php @@ -9,15 +9,12 @@ * file that was distributed with this source code. */ -namespace Symfony\Component\Messenger\Transport\Factory; - -use Symfony\Component\Messenger\Transport\ReceiverInterface; -use Symfony\Component\Messenger\Transport\SenderInterface; +namespace Symfony\Component\Messenger\Transport; /** * @author Samuel Roze */ -class ChainTransportFactory implements TransportFactoryInterface +class TransportFactory implements TransportFactoryInterface { private $factories; @@ -29,22 +26,11 @@ public function __construct(iterable $factories) $this->factories = $factories; } - public function createReceiver(string $dsn, array $options): ReceiverInterface - { - foreach ($this->factories as $factory) { - if ($factory->supports($dsn, $options)) { - return $factory->createReceiver($dsn, $options); - } - } - - throw new \InvalidArgumentException(sprintf('No transport supports the given DSN "%s".', $dsn)); - } - - public function createSender(string $dsn, array $options): SenderInterface + public function createTransport(string $dsn, array $options): TransportInterface { foreach ($this->factories as $factory) { if ($factory->supports($dsn, $options)) { - return $factory->createSender($dsn, $options); + return $factory->createTransport($dsn, $options); } } diff --git a/src/Symfony/Component/Messenger/Transport/Factory/TransportFactoryInterface.php b/src/Symfony/Component/Messenger/Transport/TransportFactoryInterface.php similarity index 77% rename from src/Symfony/Component/Messenger/Transport/Factory/TransportFactoryInterface.php rename to src/Symfony/Component/Messenger/Transport/TransportFactoryInterface.php index 47ded446bf06c..156eee9839cf6 100644 --- a/src/Symfony/Component/Messenger/Transport/Factory/TransportFactoryInterface.php +++ b/src/Symfony/Component/Messenger/Transport/TransportFactoryInterface.php @@ -9,10 +9,7 @@ * file that was distributed with this source code. */ -namespace Symfony\Component\Messenger\Transport\Factory; - -use Symfony\Component\Messenger\Transport\ReceiverInterface; -use Symfony\Component\Messenger\Transport\SenderInterface; +namespace Symfony\Component\Messenger\Transport; /** * Creates a Messenger transport. diff --git a/src/Symfony/Component/Messenger/Transport/TransportInterface.php b/src/Symfony/Component/Messenger/Transport/TransportInterface.php new file mode 100644 index 0000000000000..066cf5cda7e4b --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/TransportInterface.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport; + +/** + * @author Nicolas Grekas + * + * @experimental in 4.1 + */ +interface TransportInterface extends ReceiverInterface, SenderInterface +{ +}