Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d9e2732

Browse filesBrowse files
committed
feature #29007 [Messenger] Add a Doctrine transport (vincenttouzet)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c [Messenger] Add a Doctrine transport
2 parents 88042a3 + 88d008c commit d9e2732
Copy full SHA for d9e2732

14 files changed

+1147
-1
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ CHANGELOG
5959
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
6060
and queues by default. Previously, this was done when in "debug" mode
6161
only. Pass the `auto_setup` connection option to control this.
62-
6362
* Added a `SetupTransportsCommand` command to setup the transports
63+
* Added a Doctrine transport. For example, the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
6464

6565
4.2.0
6666
-----
+214Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\DBALException;
15+
use Doctrine\DBAL\Driver\Statement;
16+
use Doctrine\DBAL\Platforms\AbstractPlatform;
17+
use Doctrine\DBAL\Query\QueryBuilder;
18+
use PHPUnit\Framework\TestCase;
19+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
20+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
21+
22+
class ConnectionTest extends TestCase
23+
{
24+
public function testGetAMessageWillChangeItsStatus()
25+
{
26+
$queryBuilder = $this->getQueryBuilderMock();
27+
$driverConnection = $this->getDBALConnectionMock();
28+
$stmt = $this->getStatementMock([
29+
'id' => 1,
30+
'body' => '{"message":"Hi"}',
31+
'headers' => \json_encode(['type' => DummyMessage::class]),
32+
]);
33+
34+
$driverConnection
35+
->method('createQueryBuilder')
36+
->willReturn($queryBuilder);
37+
$queryBuilder
38+
->method('getSQL')
39+
->willReturn('');
40+
$driverConnection
41+
->method('prepare')
42+
->willReturn($stmt);
43+
44+
$connection = new Connection([], $driverConnection);
45+
$doctrineEnvelope = $connection->get();
46+
$this->assertEquals(1, $doctrineEnvelope['id']);
47+
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']);
49+
}
50+
51+
public function testGetWithNoPendingMessageWillReturnNull()
52+
{
53+
$queryBuilder = $this->getQueryBuilderMock();
54+
$driverConnection = $this->getDBALConnectionMock();
55+
$stmt = $this->getStatementMock(false);
56+
57+
$driverConnection->expects($this->once())
58+
->method('createQueryBuilder')
59+
->willReturn($queryBuilder);
60+
$driverConnection->method('prepare')
61+
->willReturn($stmt);
62+
$driverConnection->expects($this->never())
63+
->method('update');
64+
65+
$connection = new Connection([], $driverConnection);
66+
$doctrineEnvelope = $connection->get();
67+
$this->assertNull($doctrineEnvelope);
68+
}
69+
70+
/**
71+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
72+
*/
73+
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
74+
{
75+
$driverConnection = $this->getDBALConnectionMock();
76+
$driverConnection->method('delete')->willThrowException(new DBALException());
77+
78+
$connection = new Connection([], $driverConnection);
79+
$connection->ack('dummy_id');
80+
}
81+
82+
/**
83+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
84+
*/
85+
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
86+
{
87+
$driverConnection = $this->getDBALConnectionMock();
88+
$driverConnection->method('delete')->willThrowException(new DBALException());
89+
90+
$connection = new Connection([], $driverConnection);
91+
$connection->reject('dummy_id');
92+
}
93+
94+
private function getDBALConnectionMock()
95+
{
96+
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
97+
->disableOriginalConstructor()
98+
->getMock();
99+
$platform = $this->getMockBuilder(AbstractPlatform::class)
100+
->getMock();
101+
$platform->method('getWriteLockSQL')->willReturn('FOR UPDATE');
102+
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
103+
104+
return $driverConnection;
105+
}
106+
107+
private function getQueryBuilderMock()
108+
{
109+
$queryBuilder = $this->getMockBuilder(QueryBuilder::class)
110+
->disableOriginalConstructor()
111+
->getMock();
112+
113+
$queryBuilder->method('select')->willReturn($queryBuilder);
114+
$queryBuilder->method('update')->willReturn($queryBuilder);
115+
$queryBuilder->method('from')->willReturn($queryBuilder);
116+
$queryBuilder->method('set')->willReturn($queryBuilder);
117+
$queryBuilder->method('where')->willReturn($queryBuilder);
118+
$queryBuilder->method('andWhere')->willReturn($queryBuilder);
119+
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
120+
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
121+
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
122+
123+
return $queryBuilder;
124+
}
125+
126+
private function getStatementMock($expectedResult)
127+
{
128+
$stmt = $this->getMockBuilder(Statement::class)
129+
->disableOriginalConstructor()
130+
->getMock();
131+
$stmt->expects($this->once())
132+
->method('fetch')
133+
->willReturn($expectedResult);
134+
135+
return $stmt;
136+
}
137+
138+
/**
139+
* @dataProvider buildConfigurationProvider
140+
*/
141+
public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue)
142+
{
143+
$config = Connection::buildConfiguration($dsn, $options);
144+
$this->assertEquals($expectedManager, $config['connection']);
145+
$this->assertEquals($expectedTableName, $config['table_name']);
146+
$this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']);
147+
$this->assertEquals($expectedQueue, $config['queue_name']);
148+
}
149+
150+
public function buildConfigurationProvider()
151+
{
152+
return [
153+
[
154+
'dsn' => 'doctrine://default',
155+
'options' => [],
156+
'expectedManager' => 'default',
157+
'expectedTableName' => 'messenger_messages',
158+
'expectedRedeliverTimeout' => 3600,
159+
'expectedQueue' => 'default',
160+
],
161+
// test options from options array
162+
[
163+
'dsn' => 'doctrine://default',
164+
'options' => [
165+
'table_name' => 'name_from_options',
166+
'redeliver_timeout' => 1800,
167+
'queue_name' => 'important',
168+
],
169+
'expectedManager' => 'default',
170+
'expectedTableName' => 'name_from_options',
171+
'expectedRedeliverTimeout' => 1800,
172+
'expectedQueue' => 'important',
173+
],
174+
// tests options from dsn
175+
[
176+
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
177+
'options' => [],
178+
'expectedManager' => 'default',
179+
'expectedTableName' => 'name_from_dsn',
180+
'expectedRedeliverTimeout' => 1200,
181+
'expectedQueue' => 'normal',
182+
],
183+
// test options from options array wins over options from dsn
184+
[
185+
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
186+
'options' => [
187+
'table_name' => 'name_from_options',
188+
'redeliver_timeout' => 1800,
189+
'queue_name' => 'important',
190+
],
191+
'expectedManager' => 'default',
192+
'expectedTableName' => 'name_from_options',
193+
'expectedRedeliverTimeout' => 1800,
194+
'expectedQueue' => 'important',
195+
],
196+
];
197+
}
198+
199+
/**
200+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
201+
*/
202+
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined()
203+
{
204+
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']);
205+
}
206+
207+
/**
208+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
209+
*/
210+
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
211+
{
212+
Connection::buildConfiguration('doctrine://default?new_option=woops');
213+
}
214+
}
+127Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\DriverManager;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
18+
19+
/**
20+
* @requires pdo_mysql
21+
*/
22+
class DoctrineIntegrationTest extends TestCase
23+
{
24+
private $driverConnection;
25+
private $connection;
26+
27+
protected function setUp()
28+
{
29+
parent::setUp();
30+
31+
if (!getenv('MESSENGER_DOCTRINE_DSN')) {
32+
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.');
33+
}
34+
$dsn = getenv('MESSENGER_DOCTRINE_DSN');
35+
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]);
36+
$this->connection = new Connection([], $this->driverConnection);
37+
// call send to auto-setup the table
38+
$this->connection->setup();
39+
// ensure the table is clean for tests
40+
$this->driverConnection->exec('DELETE FROM messenger_messages');
41+
}
42+
43+
public function testConnectionSendAndGet()
44+
{
45+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
46+
$encoded = $this->connection->get();
47+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
49+
}
50+
51+
public function testSendWithDelay()
52+
{
53+
$this->connection->send('{"message": "Hi i am delayed"}', ['type' => DummyMessage::class], 600000);
54+
55+
$available_at = $this->driverConnection->createQueryBuilder()
56+
->select('m.available_at')
57+
->from('messenger_messages', 'm')
58+
->where('m.body = :body')
59+
->setParameter(':body', '{"message": "Hi i am delayed"}')
60+
->execute()
61+
->fetchColumn();
62+
63+
$available_at = new \DateTime($available_at);
64+
65+
$now = \DateTime::createFromFormat('U.u', microtime(true));
66+
$now->modify('+60 seconds');
67+
$this->assertGreaterThan($now, $available_at);
68+
}
69+
70+
public function testItRetrieveTheFirstAvailableMessage()
71+
{
72+
// insert messages
73+
// one currently handled
74+
$this->driverConnection->insert('messenger_messages', [
75+
'body' => '{"message": "Hi handled"}',
76+
'headers' => json_encode(['type' => DummyMessage::class]),
77+
'queue_name' => 'default',
78+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
79+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
80+
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
81+
]);
82+
// one available later
83+
$this->driverConnection->insert('messenger_messages', [
84+
'body' => '{"message": "Hi delayed"}',
85+
'headers' => json_encode(['type' => DummyMessage::class]),
86+
'queue_name' => 'default',
87+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
88+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')),
89+
]);
90+
// one available
91+
$this->driverConnection->insert('messenger_messages', [
92+
'body' => '{"message": "Hi available"}',
93+
'headers' => json_encode(['type' => DummyMessage::class]),
94+
'queue_name' => 'default',
95+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
96+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
97+
]);
98+
99+
$encoded = $this->connection->get();
100+
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
101+
}
102+
103+
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
104+
{
105+
$twoHoursAgo = new \DateTime('now');
106+
$twoHoursAgo->modify('-2 hours');
107+
$this->driverConnection->insert('messenger_messages', [
108+
'body' => '{"message": "Hi requeued"}',
109+
'headers' => json_encode(['type' => DummyMessage::class]),
110+
'queue_name' => 'default',
111+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
112+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
113+
'delivered_at' => Connection::formatDateTime($twoHoursAgo),
114+
]);
115+
$this->driverConnection->insert('messenger_messages', [
116+
'body' => '{"message": "Hi available"}',
117+
'headers' => json_encode(['type' => DummyMessage::class]),
118+
'queue_name' => 'default',
119+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
120+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
121+
]);
122+
123+
$next = $this->connection->get();
124+
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
125+
$this->connection->reject($next['id']);
126+
}
127+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.