Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b2bccd7

Browse filesBrowse files
committed
[Messenger] Add a Doctrine transport
1 parent dd47fda commit b2bccd7
Copy full SHA for b2bccd7

14 files changed

+1149
-1
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ CHANGELOG
5757
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
5858
and queues by default. Previously, this was done when in "debug" mode
5959
only. Pass the `auto_setup` connection option to control this.
60-
6160
* Added a `SetupTransportsCommand` command to setup the transports
61+
* Added a Doctrine transport. For example, the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
6262

6363
4.2.0
6464
-----
+214Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\DBALException;
15+
use Doctrine\DBAL\Driver\Statement;
16+
use Doctrine\DBAL\Platforms\AbstractPlatform;
17+
use Doctrine\DBAL\Query\QueryBuilder;
18+
use PHPUnit\Framework\TestCase;
19+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
20+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
21+
22+
class ConnectionTest extends TestCase
23+
{
24+
public function testGetAMessageWillChangeItsStatus()
25+
{
26+
$queryBuilder = $this->getQueryBuilderMock();
27+
$driverConnection = $this->getDBALConnectionMock();
28+
$stmt = $this->getStatementMock([
29+
'id' => 1,
30+
'body' => '{"message":"Hi"}',
31+
'headers' => \json_encode(['type' => DummyMessage::class]),
32+
]);
33+
34+
$driverConnection
35+
->method('createQueryBuilder')
36+
->willReturn($queryBuilder);
37+
$queryBuilder
38+
->method('getSQL')
39+
->willReturn('');
40+
$driverConnection
41+
->method('prepare')
42+
->willReturn($stmt);
43+
44+
$connection = new Connection([], $driverConnection);
45+
$doctrineEnvelope = $connection->get();
46+
$this->assertEquals(1, $doctrineEnvelope['id']);
47+
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']);
49+
}
50+
51+
public function testGetWithNoPendingMessageWillReturnNull()
52+
{
53+
$queryBuilder = $this->getQueryBuilderMock();
54+
$driverConnection = $this->getDBALConnectionMock();
55+
$stmt = $this->getStatementMock(false);
56+
57+
$driverConnection->expects($this->once())
58+
->method('createQueryBuilder')
59+
->willReturn($queryBuilder);
60+
$driverConnection->method('prepare')
61+
->willReturn($stmt);
62+
$driverConnection->expects($this->never())
63+
->method('update');
64+
65+
$connection = new Connection([], $driverConnection);
66+
$doctrineEnvelope = $connection->get();
67+
$this->assertNull($doctrineEnvelope);
68+
}
69+
70+
/**
71+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
72+
*/
73+
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
74+
{
75+
$driverConnection = $this->getDBALConnectionMock();
76+
$driverConnection->method('delete')->willThrowException(new DBALException());
77+
78+
$connection = new Connection([], $driverConnection);
79+
$connection->ack('dummy_id');
80+
}
81+
82+
/**
83+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
84+
*/
85+
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
86+
{
87+
$driverConnection = $this->getDBALConnectionMock();
88+
$driverConnection->method('delete')->willThrowException(new DBALException());
89+
90+
$connection = new Connection([], $driverConnection);
91+
$connection->reject('dummy_id');
92+
}
93+
94+
private function getDBALConnectionMock()
95+
{
96+
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
97+
->disableOriginalConstructor()
98+
->getMock();
99+
$platform = $this->getMockBuilder(AbstractPlatform::class)
100+
->getMock();
101+
$platform->method('getWriteLockSQL')->willReturn('FOR UPDATE');
102+
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
103+
104+
return $driverConnection;
105+
}
106+
107+
private function getQueryBuilderMock()
108+
{
109+
$queryBuilder = $this->getMockBuilder(QueryBuilder::class)
110+
->disableOriginalConstructor()
111+
->getMock();
112+
113+
$queryBuilder->method('select')->willReturn($queryBuilder);
114+
$queryBuilder->method('update')->willReturn($queryBuilder);
115+
$queryBuilder->method('from')->willReturn($queryBuilder);
116+
$queryBuilder->method('set')->willReturn($queryBuilder);
117+
$queryBuilder->method('where')->willReturn($queryBuilder);
118+
$queryBuilder->method('andWhere')->willReturn($queryBuilder);
119+
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
120+
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
121+
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
122+
123+
return $queryBuilder;
124+
}
125+
126+
private function getStatementMock($expectedResult)
127+
{
128+
$stmt = $this->getMockBuilder(Statement::class)
129+
->disableOriginalConstructor()
130+
->getMock();
131+
$stmt->expects($this->once())
132+
->method('fetch')
133+
->willReturn($expectedResult);
134+
135+
return $stmt;
136+
}
137+
138+
/**
139+
* @dataProvider buildConfigurationProvider
140+
*/
141+
public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue)
142+
{
143+
$config = Connection::buildConfiguration($dsn, $options);
144+
$this->assertEquals($expectedManager, $config['connection']);
145+
$this->assertEquals($expectedTableName, $config['table_name']);
146+
$this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']);
147+
$this->assertEquals($expectedQueue, $config['queue_name']);
148+
}
149+
150+
public function buildConfigurationProvider()
151+
{
152+
return [
153+
[
154+
'dsn' => 'doctrine://default',
155+
'options' => [],
156+
'expectedManager' => 'default',
157+
'expectedTableName' => 'messenger_messages',
158+
'expectedRedeliverTimeout' => 3600,
159+
'expectedQueue' => 'default',
160+
],
161+
// test options from options array
162+
[
163+
'dsn' => 'doctrine://default',
164+
'options' => [
165+
'table_name' => 'name_from_options',
166+
'redeliver_timeout' => 1800,
167+
'queue_name' => 'important',
168+
],
169+
'expectedManager' => 'default',
170+
'expectedTableName' => 'name_from_options',
171+
'expectedRedeliverTimeout' => 1800,
172+
'expectedQueue' => 'important',
173+
],
174+
// tests options from dsn
175+
[
176+
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
177+
'options' => [],
178+
'expectedManager' => 'default',
179+
'expectedTableName' => 'name_from_dsn',
180+
'expectedRedeliverTimeout' => 1200,
181+
'expectedQueue' => 'normal',
182+
],
183+
// test options from options array wins over options from dsn
184+
[
185+
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
186+
'options' => [
187+
'table_name' => 'name_from_options',
188+
'redeliver_timeout' => 1800,
189+
'queue_name' => 'important',
190+
],
191+
'expectedManager' => 'default',
192+
'expectedTableName' => 'name_from_options',
193+
'expectedRedeliverTimeout' => 1800,
194+
'expectedQueue' => 'important',
195+
],
196+
];
197+
}
198+
199+
/**
200+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
201+
*/
202+
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined()
203+
{
204+
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']);
205+
}
206+
207+
/**
208+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
209+
*/
210+
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
211+
{
212+
Connection::buildConfiguration('doctrine://default?new_option=woops');
213+
}
214+
}
+127Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\DriverManager;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
18+
19+
/**
20+
* @requires pdo_mysql
21+
*/
22+
class DoctrineIntegrationTest extends TestCase
23+
{
24+
private $driverConnection;
25+
private $connection;
26+
27+
protected function setUp()
28+
{
29+
parent::setUp();
30+
31+
if (!getenv('MESSENGER_DOCTRINE_DSN')) {
32+
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.');
33+
}
34+
$dsn = getenv('MESSENGER_DOCTRINE_DSN');
35+
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]);
36+
$this->connection = new Connection([], $this->driverConnection);
37+
// call send to auto-setup the table
38+
$this->connection->setup();
39+
// ensure the table is clean for tests
40+
$this->driverConnection->exec('DELETE FROM messenger_messages');
41+
}
42+
43+
public function testConnectionSendAndGet()
44+
{
45+
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
46+
$encoded = $this->connection->get();
47+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
49+
}
50+
51+
public function testSendWithDelay()
52+
{
53+
$this->connection->send('{"message": "Hi i am delayed"}', ['type' => DummyMessage::class], 600000);
54+
55+
$available_at = $this->driverConnection->createQueryBuilder()
56+
->select('m.available_at')
57+
->from('messenger_messages', 'm')
58+
->where('m.body = :body')
59+
->setParameter(':body', '{"message": "Hi i am delayed"}')
60+
->execute()
61+
->fetchColumn();
62+
63+
$available_at = new \DateTime($available_at);
64+
65+
$now = \DateTime::createFromFormat('U.u', microtime(true));
66+
$now->modify('+60 seconds');
67+
$this->assertGreaterThan($now, $available_at);
68+
}
69+
70+
public function testItRetrieveTheFirstAvailableMessage()
71+
{
72+
// insert messages
73+
// one currently handled
74+
$this->driverConnection->insert('messenger_messages', [
75+
'body' => '{"message": "Hi handled"}',
76+
'headers' => json_encode(['type' => DummyMessage::class]),
77+
'queue_name' => 'default',
78+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
79+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
80+
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
81+
]);
82+
// one available later
83+
$this->driverConnection->insert('messenger_messages', [
84+
'body' => '{"message": "Hi delayed"}',
85+
'headers' => json_encode(['type' => DummyMessage::class]),
86+
'queue_name' => 'default',
87+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
88+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')),
89+
]);
90+
// one available
91+
$this->driverConnection->insert('messenger_messages', [
92+
'body' => '{"message": "Hi available"}',
93+
'headers' => json_encode(['type' => DummyMessage::class]),
94+
'queue_name' => 'default',
95+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
96+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
97+
]);
98+
99+
$encoded = $this->connection->get();
100+
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
101+
}
102+
103+
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
104+
{
105+
$twoHoursAgo = new \DateTime('now');
106+
$twoHoursAgo->modify('-2 hours');
107+
$this->driverConnection->insert('messenger_messages', [
108+
'body' => '{"message": "Hi requeued"}',
109+
'headers' => json_encode(['type' => DummyMessage::class]),
110+
'queue_name' => 'default',
111+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
112+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
113+
'delivered_at' => Connection::formatDateTime($twoHoursAgo),
114+
]);
115+
$this->driverConnection->insert('messenger_messages', [
116+
'body' => '{"message": "Hi available"}',
117+
'headers' => json_encode(['type' => DummyMessage::class]),
118+
'queue_name' => 'default',
119+
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
120+
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
121+
]);
122+
123+
$next = $this->connection->get();
124+
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
125+
$this->connection->reject($next['id']);
126+
}
127+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.