diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 0d994c5f8f8f0..ff6351e7078e5 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -65,6 +65,7 @@ use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Transport\SenderInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\PropertyAccess\PropertyAccessor; use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface; use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface; @@ -1506,19 +1507,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".'); } - $senderDefinition = (new Definition(SenderInterface::class)) - ->setFactory(array(new Reference('messenger.transport_factory'), 'createSender')) - ->setArguments(array($transport['dsn'], $transport['options'])) - ->addTag('messenger.sender', array('name' => $name)) - ; - $container->setDefinition('messenger.sender.'.$name, $senderDefinition); - - $receiverDefinition = (new Definition(ReceiverInterface::class)) - ->setFactory(array(new Reference('messenger.transport_factory'), 'createReceiver')) + $transportDefinition = (new Definition(TransportInterface::class)) + ->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport')) ->setArguments(array($transport['dsn'], $transport['options'])) ->addTag('messenger.receiver', array('name' => $name)) + ->addTag('messenger.sender', array('name' => $name)) ; - $container->setDefinition('messenger.receiver.'.$name, $receiverDefinition); + $container->setDefinition('messenger.transport.'.$name, $transportDefinition); } } diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index d7ce01d39fc21..eebf2b0f1b7f0 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -533,30 +533,20 @@ public function testMessenger() public function testMessengerTransports() { $container = $this->createContainerFromFile('messenger_transports'); - $this->assertTrue($container->hasDefinition('messenger.sender.default')); - $this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender')); - $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.sender.default')->getTag('messenger.sender')); - $this->assertTrue($container->hasDefinition('messenger.receiver.default')); - $this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver')); - $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.receiver.default')->getTag('messenger.receiver')); - - $this->assertTrue($container->hasDefinition('messenger.sender.customised')); - $senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory(); - $senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments(); - - $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createSender'), $senderFactory); - $this->assertCount(2, $senderArguments); - $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]); - $this->assertSame(array('queue' => array('name' => 'Queue')), $senderArguments[1]); - - $this->assertTrue($container->hasDefinition('messenger.receiver.customised')); - $receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory(); - $receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments(); - - $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createReceiver'), $receiverFactory); - $this->assertCount(2, $receiverArguments); - $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]); - $this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]); + $this->assertTrue($container->hasDefinition('messenger.transport.default')); + $this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver')); + $this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender')); + $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver')); + $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender')); + + $this->assertTrue($container->hasDefinition('messenger.transport.customised')); + $transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory(); + $transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments(); + + $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createTransport'), $transportFactory); + $this->assertCount(2, $transportArguments); + $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]); + $this->assertSame(array('queue' => array('name' => 'Queue')), $transportArguments[1]); $this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory')); } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 0c4bb18f31ca7..523c35db0f11b 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -33,7 +33,7 @@ public function __construct(EncoderInterface $messageEncoder, Connection $connec /** * {@inheritdoc} */ - public function send($message) + public function send($message): void { $encodedMessage = $this->messageEncoder->encode($message); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php new file mode 100644 index 0000000000000..165a795d71ab7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php @@ -0,0 +1,79 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\AmqpExt; + +use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; +use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Nicolas Grekas
+ */
+class AmqpTransport implements TransportInterface
+{
+ private $encoder;
+ private $decoder;
+ private $dsn;
+ private $options;
+ private $debug;
+ private $connection;
+ private $receiver;
+ private $sender;
+
+ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug)
+ {
+ $this->encoder = $encoder;
+ $this->decoder = $decoder;
+ $this->dsn = $dsn;
+ $this->options = $options;
+ $this->debug = $debug;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function receive(callable $handler): void
+ {
+ ($this->receiver ?? $this->getReceiver())->receive($hander);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function stop(): void
+ {
+ ($this->receiver ?? $this->getReceiver())->stop();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function send($message): void
+ {
+ ($this->sender ?? $this->getSender())->send($message);
+ }
+
+ private function getReceiver()
+ {
+ return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection());
+ }
+
+ private function getSender()
+ {
+ return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection());
+ }
+
+ private function getConnection()
+ {
+ return $this->connection = new Connection($this->dsn, $this->options, $this->debug);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php
index ddb385af52aef..2da18cb06692c 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php
@@ -11,11 +11,10 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
-use Symfony\Component\Messenger\Transport\Factory\TransportFactoryInterface;
-use Symfony\Component\Messenger\Transport\ReceiverInterface;
-use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
+use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Samuel Roze
+ *
+ * @experimental in 4.1
+ */
+interface TransportInterface extends ReceiverInterface, SenderInterface
+{
+}