From 88d008c82844806cdc46843e5cfaaded635d4e12 Mon Sep 17 00:00:00 2001 From: Vincent Touzet Date: Sat, 30 Mar 2019 13:37:54 +0100 Subject: [PATCH] [Messenger] Add a Doctrine transport --- src/Symfony/Component/Messenger/CHANGELOG.md | 2 +- .../Transport/Doctrine/ConnectionTest.php | 214 +++++++++++++++ .../Doctrine/DoctrineIntegrationTest.php | 127 +++++++++ .../Doctrine/DoctrineReceiverTest.php | 77 ++++++ .../Transport/Doctrine/DoctrineSenderTest.php | 57 ++++ .../Doctrine/DoctrineTransportFactoryTest.php | 75 +++++ .../Transport/AmqpExt/AmqpReceiver.php | 6 + .../Transport/Doctrine/Connection.php | 259 ++++++++++++++++++ .../Doctrine/DoctrineReceivedStamp.php | 34 +++ .../Transport/Doctrine/DoctrineReceiver.php | 95 +++++++ .../Transport/Doctrine/DoctrineSender.php | 57 ++++ .../Transport/Doctrine/DoctrineTransport.php | 87 ++++++ .../Doctrine/DoctrineTransportFactory.php | 56 ++++ src/Symfony/Component/Messenger/composer.json | 2 + 14 files changed, 1147 insertions(+), 1 deletion(-) create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php create mode 100644 src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineTransportFactoryTest.php create mode 100644 src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php create mode 100644 src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceivedStamp.php create mode 100644 src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php create mode 100644 src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php create mode 100644 src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php create mode 100644 src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 304790630b759..cd4481d752d68 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -57,8 +57,8 @@ CHANGELOG * [BC BREAK] The Amqp Transport now automatically sets up the exchanges and queues by default. Previously, this was done when in "debug" mode only. Pass the `auto_setup` connection option to control this. - * Added a `SetupTransportsCommand` command to setup the transports + * Added a Doctrine transport. For example, the `doctrine://default` DSN (this uses the `default` Doctrine entity manager) 4.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php new file mode 100644 index 0000000000000..26878e3647bfe --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php @@ -0,0 +1,214 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; + +use Doctrine\DBAL\DBALException; +use Doctrine\DBAL\Driver\Statement; +use Doctrine\DBAL\Platforms\AbstractPlatform; +use Doctrine\DBAL\Query\QueryBuilder; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Doctrine\Connection; + +class ConnectionTest extends TestCase +{ + public function testGetAMessageWillChangeItsStatus() + { + $queryBuilder = $this->getQueryBuilderMock(); + $driverConnection = $this->getDBALConnectionMock(); + $stmt = $this->getStatementMock([ + 'id' => 1, + 'body' => '{"message":"Hi"}', + 'headers' => \json_encode(['type' => DummyMessage::class]), + ]); + + $driverConnection + ->method('createQueryBuilder') + ->willReturn($queryBuilder); + $queryBuilder + ->method('getSQL') + ->willReturn(''); + $driverConnection + ->method('prepare') + ->willReturn($stmt); + + $connection = new Connection([], $driverConnection); + $doctrineEnvelope = $connection->get(); + $this->assertEquals(1, $doctrineEnvelope['id']); + $this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']); + $this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']); + } + + public function testGetWithNoPendingMessageWillReturnNull() + { + $queryBuilder = $this->getQueryBuilderMock(); + $driverConnection = $this->getDBALConnectionMock(); + $stmt = $this->getStatementMock(false); + + $driverConnection->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder); + $driverConnection->method('prepare') + ->willReturn($stmt); + $driverConnection->expects($this->never()) + ->method('update'); + + $connection = new Connection([], $driverConnection); + $doctrineEnvelope = $connection->get(); + $this->assertNull($doctrineEnvelope); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() + { + $driverConnection = $this->getDBALConnectionMock(); + $driverConnection->method('delete')->willThrowException(new DBALException()); + + $connection = new Connection([], $driverConnection); + $connection->ack('dummy_id'); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotRejectMessage() + { + $driverConnection = $this->getDBALConnectionMock(); + $driverConnection->method('delete')->willThrowException(new DBALException()); + + $connection = new Connection([], $driverConnection); + $connection->reject('dummy_id'); + } + + private function getDBALConnectionMock() + { + $driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class) + ->disableOriginalConstructor() + ->getMock(); + $platform = $this->getMockBuilder(AbstractPlatform::class) + ->getMock(); + $platform->method('getWriteLockSQL')->willReturn('FOR UPDATE'); + $driverConnection->method('getDatabasePlatform')->willReturn($platform); + + return $driverConnection; + } + + private function getQueryBuilderMock() + { + $queryBuilder = $this->getMockBuilder(QueryBuilder::class) + ->disableOriginalConstructor() + ->getMock(); + + $queryBuilder->method('select')->willReturn($queryBuilder); + $queryBuilder->method('update')->willReturn($queryBuilder); + $queryBuilder->method('from')->willReturn($queryBuilder); + $queryBuilder->method('set')->willReturn($queryBuilder); + $queryBuilder->method('where')->willReturn($queryBuilder); + $queryBuilder->method('andWhere')->willReturn($queryBuilder); + $queryBuilder->method('orderBy')->willReturn($queryBuilder); + $queryBuilder->method('setMaxResults')->willReturn($queryBuilder); + $queryBuilder->method('setParameter')->willReturn($queryBuilder); + + return $queryBuilder; + } + + private function getStatementMock($expectedResult) + { + $stmt = $this->getMockBuilder(Statement::class) + ->disableOriginalConstructor() + ->getMock(); + $stmt->expects($this->once()) + ->method('fetch') + ->willReturn($expectedResult); + + return $stmt; + } + + /** + * @dataProvider buildConfigurationProvider + */ + public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue) + { + $config = Connection::buildConfiguration($dsn, $options); + $this->assertEquals($expectedManager, $config['connection']); + $this->assertEquals($expectedTableName, $config['table_name']); + $this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']); + $this->assertEquals($expectedQueue, $config['queue_name']); + } + + public function buildConfigurationProvider() + { + return [ + [ + 'dsn' => 'doctrine://default', + 'options' => [], + 'expectedManager' => 'default', + 'expectedTableName' => 'messenger_messages', + 'expectedRedeliverTimeout' => 3600, + 'expectedQueue' => 'default', + ], + // test options from options array + [ + 'dsn' => 'doctrine://default', + 'options' => [ + 'table_name' => 'name_from_options', + 'redeliver_timeout' => 1800, + 'queue_name' => 'important', + ], + 'expectedManager' => 'default', + 'expectedTableName' => 'name_from_options', + 'expectedRedeliverTimeout' => 1800, + 'expectedQueue' => 'important', + ], + // tests options from dsn + [ + 'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal', + 'options' => [], + 'expectedManager' => 'default', + 'expectedTableName' => 'name_from_dsn', + 'expectedRedeliverTimeout' => 1200, + 'expectedQueue' => 'normal', + ], + // test options from options array wins over options from dsn + [ + 'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal', + 'options' => [ + 'table_name' => 'name_from_options', + 'redeliver_timeout' => 1800, + 'queue_name' => 'important', + ], + 'expectedManager' => 'default', + 'expectedTableName' => 'name_from_options', + 'expectedRedeliverTimeout' => 1800, + 'expectedQueue' => 'important', + ], + ]; + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsAnExceptionIfAnExtraOptionsInDefined() + { + Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN() + { + Connection::buildConfiguration('doctrine://default?new_option=woops'); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php new file mode 100644 index 0000000000000..ffcde2039306d --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php @@ -0,0 +1,127 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; + +use Doctrine\DBAL\DriverManager; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Doctrine\Connection; + +/** + * @requires pdo_mysql + */ +class DoctrineIntegrationTest extends TestCase +{ + private $driverConnection; + private $connection; + + protected function setUp() + { + parent::setUp(); + + if (!getenv('MESSENGER_DOCTRINE_DSN')) { + $this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.'); + } + $dsn = getenv('MESSENGER_DOCTRINE_DSN'); + $this->driverConnection = DriverManager::getConnection(['url' => $dsn]); + $this->connection = new Connection([], $this->driverConnection); + // call send to auto-setup the table + $this->connection->setup(); + // ensure the table is clean for tests + $this->driverConnection->exec('DELETE FROM messenger_messages'); + } + + public function testConnectionSendAndGet() + { + $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + $encoded = $this->connection->get(); + $this->assertEquals('{"message": "Hi"}', $encoded['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + } + + public function testSendWithDelay() + { + $this->connection->send('{"message": "Hi i am delayed"}', ['type' => DummyMessage::class], 600000); + + $available_at = $this->driverConnection->createQueryBuilder() + ->select('m.available_at') + ->from('messenger_messages', 'm') + ->where('m.body = :body') + ->setParameter(':body', '{"message": "Hi i am delayed"}') + ->execute() + ->fetchColumn(); + + $available_at = new \DateTime($available_at); + + $now = \DateTime::createFromFormat('U.u', microtime(true)); + $now->modify('+60 seconds'); + $this->assertGreaterThan($now, $available_at); + } + + public function testItRetrieveTheFirstAvailableMessage() + { + // insert messages + // one currently handled + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi handled"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))), + ]); + // one available later + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi delayed"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')), + ]); + // one available + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi available"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + ]); + + $encoded = $this->connection->get(); + $this->assertEquals('{"message": "Hi available"}', $encoded['body']); + } + + public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout() + { + $twoHoursAgo = new \DateTime('now'); + $twoHoursAgo->modify('-2 hours'); + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi requeued"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'delivered_at' => Connection::formatDateTime($twoHoursAgo), + ]); + $this->driverConnection->insert('messenger_messages', [ + 'body' => '{"message": "Hi available"}', + 'headers' => json_encode(['type' => DummyMessage::class]), + 'queue_name' => 'default', + 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + ]); + + $next = $this->connection->get(); + $this->assertEquals('{"message": "Hi requeued"}', $next['body']); + $this->connection->reject($next['id']); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php new file mode 100644 index 0000000000000..0507a0ccfa91e --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php @@ -0,0 +1,77 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Doctrine\Connection; +use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +class DoctrineReceiverTest extends TestCase +{ + public function testItReturnsTheDecodedMessageToTheHandler() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelop = $this->createDoctrineEnvelope(); + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($doctrineEnvelop); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\MessageDecodingFailedException + */ + public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() + { + $serializer = $this->createMock(PhpSerializer::class); + $serializer->method('decode')->willThrowException(new MessageDecodingFailedException()); + + $doctrineEnvelop = $this->createDoctrineEnvelope(); + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($doctrineEnvelop); + $connection->expects($this->once())->method('reject'); + + $receiver = new DoctrineReceiver($connection, $serializer); + iterator_to_array($receiver->get()); + } + + private function createDoctrineEnvelope() + { + return [ + 'id' => 1, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + ], + ]; + } + + private function createSerializer(): Serializer + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + return $serializer; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php new file mode 100644 index 0000000000000..26badf93340f5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Doctrine\Connection; +use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +class DoctrineSenderTest extends TestCase +{ + public function testSend() + { + $envelope = new Envelope(new DummyMessage('Oy')); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->getMock(); + $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']); + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $sender = new DoctrineSender($connection, $serializer); + $sender->send($envelope); + } + + public function testSendWithDelay() + { + $envelope = (new Envelope(new DummyMessage('Oy')))->with(new DelayStamp(500)); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->getMock(); + $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500); + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $sender = new DoctrineSender($connection, $serializer); + $sender->send($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineTransportFactoryTest.php new file mode 100644 index 0000000000000..104de47dcd16b --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineTransportFactoryTest.php @@ -0,0 +1,75 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; + +use PHPUnit\Framework\TestCase; +use Symfony\Bridge\Doctrine\RegistryInterface; +use Symfony\Component\Messenger\Transport\Doctrine\Connection; +use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport; +use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory; + +class DoctrineTransportFactoryTest extends TestCase +{ + public function testSupports() + { + $factory = new DoctrineTransportFactory( + $this->getMockBuilder(RegistryInterface::class)->getMock(), + null, + false + ); + + $this->assertTrue($factory->supports('doctrine://default', [])); + $this->assertFalse($factory->supports('amqp://localhost', [])); + } + + public function testCreateTransport() + { + $connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class) + ->disableOriginalConstructor() + ->getMock(); + $registry = $this->getMockBuilder(RegistryInterface::class)->getMock(); + $registry->expects($this->once()) + ->method('getConnection') + ->willReturn($connection); + + $factory = new DoctrineTransportFactory( + $registry, + null + ); + + $this->assertEquals( + new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $connection), null), + $factory->createTransport('doctrine://default', []) + ); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + * @expectedExceptionMessage Could not find Doctrine connection from DSN "doctrine://default". + */ + public function testCreateTransportMustThrowAnExceptionIfManagerIsNotFound() + { + $registry = $this->getMockBuilder(RegistryInterface::class)->getMock(); + $registry->expects($this->once()) + ->method('getConnection') + ->will($this->returnCallback(function () { + throw new \InvalidArgumentException(); + })); + + $factory = new DoctrineTransportFactory( + $registry, + null + ); + + $factory->createTransport('doctrine://default', []); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index ce1918e1da0c1..9f1e25a40267a 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -67,6 +67,9 @@ public function get(): iterable yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope)); } + /** + * {@inheritdoc} + */ public function ack(Envelope $envelope): void { try { @@ -76,6 +79,9 @@ public function ack(Envelope $envelope): void } } + /** + * {@inheritdoc} + */ public function reject(Envelope $envelope): void { $this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope)); diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php new file mode 100644 index 0000000000000..6bf4bd2e32955 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php @@ -0,0 +1,259 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Doctrine; + +use Doctrine\DBAL\Connection as DBALConnection; +use Doctrine\DBAL\DBALException; +use Doctrine\DBAL\Exception\TableNotFoundException; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer; +use Doctrine\DBAL\Types\Type; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Exception\TransportException; + +/** + * @author Vincent Touzet + * + * @final + * + * @experimental in 4.3 + */ +class Connection +{ + const DEFAULT_OPTIONS = [ + 'table_name' => 'messenger_messages', + 'queue_name' => 'default', + 'redeliver_timeout' => 3600, + 'loop_sleep' => 200000, + 'auto_setup' => true, + ]; + + /** + * Configuration of the connection. + * + * Available options: + * + * * table_name: name of the table + * * connection: name of the Doctrine's entity manager + * * queue_name: name of the queue + * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600 + * * loop_sleep: Number of micro seconds to wait for a next message to handle + * * auto_setup: Whether the table should be created automatically during send / get. Default : true + */ + private $configuration = []; + private $driverConnection; + + public function __construct(array $configuration, DBALConnection $driverConnection) + { + $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration); + $this->driverConnection = $driverConnection; + } + + public function getConfiguration(): array + { + return $this->configuration; + } + + public static function buildConfiguration($dsn, array $options = []) + { + if (false === $parsedUrl = parse_url($dsn)) { + throw new InvalidArgumentException(sprintf('The given Doctrine DSN "%s" is invalid.', $dsn)); + } + + $components = parse_url($dsn); + $query = []; + if (isset($components['query'])) { + parse_str($components['query'], $query); + } + + $configuration = [ + 'connection' => $components['host'], + 'table_name' => $options['table_name'] ?? ($query['table_name'] ?? self::DEFAULT_OPTIONS['table_name']), + 'queue_name' => $options['queue_name'] ?? ($query['queue_name'] ?? self::DEFAULT_OPTIONS['queue_name']), + 'redeliver_timeout' => $options['redeliver_timeout'] ?? ($query['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']), + 'loop_sleep' => $options['loop_sleep'] ?? ($query['loop_sleep'] ?? self::DEFAULT_OPTIONS['loop_sleep']), + 'auto_setup' => $options['auto_setup'] ?? ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']), + ]; + + // check for extra keys in options + $optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration)); + if (0 < \count($optionsExtraKeys)) { + throw new TransportException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', self::DEFAULT_OPTIONS))); + } + + // check for extra keys in options + $queryExtraKeys = array_diff(array_keys($query), array_keys($configuration)); + if (0 < \count($queryExtraKeys)) { + throw new TransportException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', self::DEFAULT_OPTIONS))); + } + + return $configuration; + } + + /** + * @param int $delay The delay in milliseconds + * + * @throws \Doctrine\DBAL\DBALException + */ + public function send(string $body, array $headers, int $delay = 0): void + { + $now = (\DateTime::createFromFormat('U.u', microtime(true))); + $availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000)); + + $queryBuilder = $this->driverConnection->createQueryBuilder() + ->insert($this->configuration['table_name']) + ->values([ + 'body' => ':body', + 'headers' => ':headers', + 'queue_name' => ':queue_name', + 'created_at' => ':created_at', + 'available_at' => ':available_at', + ]); + + $this->executeQuery($queryBuilder->getSQL(), [ + ':body' => $body, + ':headers' => \json_encode($headers), + ':queue_name' => $this->configuration['queue_name'], + ':created_at' => self::formatDateTime($now), + ':available_at' => self::formatDateTime($availableAt), + ]); + } + + public function get(): ?array + { + $this->driverConnection->beginTransaction(); + try { + $query = $this->driverConnection->createQueryBuilder() + ->select('m.*') + ->from($this->configuration['table_name'], 'm') + ->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit') + ->andWhere('m.available_at <= :now') + ->andWhere('m.queue_name = :queue_name') + ->orderBy('available_at', 'ASC') + ->setMaxResults(1); + + $now = \DateTime::createFromFormat('U.u', microtime(true)); + $redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); + // use SELECT ... FOR UPDATE to lock table + $doctrineEnvelope = $this->executeQuery( + $query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(), + [ + ':now' => self::formatDateTime($now), + ':queue_name' => $this->configuration['queue_name'], + ':redeliver_limit' => self::formatDateTime($redeliverLimit), + ] + )->fetch(); + + if (false === $doctrineEnvelope) { + $this->driverConnection->commit(); + + return null; + } + + $doctrineEnvelope['headers'] = \json_decode($doctrineEnvelope['headers'], true); + + $queryBuilder = $this->driverConnection->createQueryBuilder() + ->update($this->configuration['table_name']) + ->set('delivered_at', ':delivered_at') + ->where('id = :id'); + $this->executeQuery($queryBuilder->getSQL(), [ + ':id' => $doctrineEnvelope['id'], + ':delivered_at' => self::formatDateTime($now), + ]); + + $this->driverConnection->commit(); + + return $doctrineEnvelope; + } catch (\Throwable $e) { + $this->driverConnection->rollBack(); + + throw $e; + } + } + + public function ack(string $id): bool + { + try { + return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0; + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + + public function reject(string $id): bool + { + try { + return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0; + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + + public function setup(): void + { + $synchronizer = new SingleDatabaseSynchronizer($this->driverConnection); + $synchronizer->updateSchema($this->getSchema(), true); + } + + private function executeQuery(string $sql, array $parameters = []) + { + $stmt = null; + try { + $stmt = $this->driverConnection->prepare($sql); + $stmt->execute($parameters); + } catch (TableNotFoundException $e) { + // create table + if (!$this->driverConnection->isTransactionActive() && $this->configuration['auto_setup']) { + $this->setup(); + } + // statement not prepared ? SQLite throw on exception on prepare if the table does not exist + if (null === $stmt) { + $stmt = $this->driverConnection->prepare($sql); + } + $stmt->execute($parameters); + } + + return $stmt; + } + + private function getSchema(): Schema + { + $schema = new Schema(); + $table = $schema->createTable($this->configuration['table_name']); + $table->addColumn('id', Type::BIGINT) + ->setAutoincrement(true) + ->setNotnull(true); + $table->addColumn('body', Type::TEXT) + ->setNotnull(true); + $table->addColumn('headers', Type::STRING) + ->setNotnull(true); + $table->addColumn('queue_name', Type::STRING) + ->setNotnull(true); + $table->addColumn('created_at', Type::DATETIME) + ->setNotnull(true); + $table->addColumn('available_at', Type::DATETIME) + ->setNotnull(true); + $table->addColumn('delivered_at', Type::DATETIME) + ->setNotnull(false); + $table->setPrimaryKey(['id']); + $table->addIndex(['queue_name']); + $table->addIndex(['available_at']); + $table->addIndex(['delivered_at']); + + return $schema; + } + + public static function formatDateTime(\DateTimeInterface $dateTime) + { + return $dateTime->format('Y-m-d\TH:i:s.uZ'); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceivedStamp.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceivedStamp.php new file mode 100644 index 0000000000000..f11217a74afcb --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceivedStamp.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Doctrine; + +use Symfony\Component\Messenger\Stamp\StampInterface; + +/** + * @author Vincent Touzet + * + * @experimental in 4.3 + */ +class DoctrineReceivedStamp implements StampInterface +{ + private $id; + + public function __construct(string $id) + { + $this->id = $id; + } + + public function getId(): string + { + return $this->id; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php new file mode 100644 index 0000000000000..3198e143dca84 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php @@ -0,0 +1,95 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Doctrine; + +use Doctrine\DBAL\DBALException; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @author Vincent Touzet + * + * @experimental in 4.3 + */ +class DoctrineReceiver implements ReceiverInterface +{ + private $connection; + private $serializer; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + try { + $doctrineEnvelope = $this->connection->get(); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + + if (null === $doctrineEnvelope) { + return []; + } + + try { + $envelope = $this->serializer->decode([ + 'body' => $doctrineEnvelope['body'], + 'headers' => $doctrineEnvelope['headers'], + ]); + } catch (MessageDecodingFailedException $exception) { + $this->connection->reject($doctrineEnvelope['id']); + + throw $exception; + } + + yield $envelope->with(new DoctrineReceivedStamp($doctrineEnvelope['id'])); + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId()); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId()); + } + + private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp + { + /** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */ + $doctrineReceivedStamp = $envelope->last(DoctrineReceivedStamp::class); + + if (null === $doctrineReceivedStamp) { + throw new LogicException('No DoctrineReceivedStamp found on the Envelope.'); + } + + return $doctrineReceivedStamp; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php new file mode 100644 index 0000000000000..8329d2a53f1bd --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Doctrine; + +use Doctrine\DBAL\DBALException; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @author Vincent Touzet + * + * @experimental in 4.3 + */ +class DoctrineSender implements SenderInterface +{ + private $connection; + private $serializer; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + $encodedMessage = $this->serializer->encode($envelope); + + /** @var DelayStamp|null $delayStamp */ + $delayStamp = $envelope->last(DelayStamp::class); + $delay = null !== $delayStamp ? $delayStamp->getDelay() : 0; + + try { + $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + + return $envelope; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php new file mode 100644 index 0000000000000..97c2a0a629557 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php @@ -0,0 +1,87 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Doctrine; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\SetupableTransportInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Vincent Touzet + * + * @experimental in 4.3 + */ +class DoctrineTransport implements TransportInterface, SetupableTransportInterface +{ + private $connection; + private $serializer; + private $receiver; + private $sender; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + ($this->receiver ?? $this->getReceiver())->get(); + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->ack($envelope); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->reject($envelope); + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + return ($this->sender ?? $this->getSender())->send($envelope); + } + + /** + * {@inheritdoc} + */ + public function setup(): void + { + $this->connection->setup(); + } + + private function getReceiver(): DoctrineReceiver + { + return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer); + } + + private function getSender(): DoctrineSender + { + return $this->sender = new DoctrineSender($this->connection, $this->serializer); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php new file mode 100644 index 0000000000000..74f37933904be --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransportFactory.php @@ -0,0 +1,56 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Doctrine; + +use Symfony\Bridge\Doctrine\RegistryInterface; +use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportFactoryInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Vincent Touzet + * + * @experimental in 4.3 + */ +class DoctrineTransportFactory implements TransportFactoryInterface +{ + private $registry; + private $serializer; + + public function __construct(RegistryInterface $registry, SerializerInterface $serializer = null) + { + $this->registry = $registry; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + public function createTransport(string $dsn, array $options): TransportInterface + { + $configuration = Connection::buildConfiguration($dsn, $options); + + try { + $driverConnection = $this->registry->getConnection($configuration['connection']); + } catch (\InvalidArgumentException $e) { + throw new TransportException(sprintf('Could not find Doctrine connection from DSN "%s".', $dsn), 0, $e); + } + + $connection = new Connection($configuration, $driverConnection); + + return new DoctrineTransport($connection, $this->serializer); + } + + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'doctrine://'); + } +} diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index d40963e2f6d18..17eebcca92093 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -20,8 +20,10 @@ "psr/log": "~1.0" }, "require-dev": { + "doctrine/dbal": "~2.4", "symfony/console": "~3.4|~4.0", "symfony/dependency-injection": "~3.4.19|^4.1.8", + "symfony/doctrine-bridge": "~3.4|~4.0", "symfony/event-dispatcher": "~4.3", "symfony/http-kernel": "~3.4|~4.0", "symfony/process": "~3.4|~4.0",