Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d174c1b

Browse filesBrowse files
committed
[Messenger] Add a Doctrine transport
1 parent c2e55ff commit d174c1b
Copy full SHA for d174c1b

File tree

12 files changed

+661
-0
lines changed
Filter options

12 files changed

+661
-0
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14991499
if (empty($config['transports'])) {
15001500
$container->removeDefinition('messenger.transport.symfony_serializer');
15011501
$container->removeDefinition('messenger.transport.amqp.factory');
1502+
$container->removeDefinition('messenger.transport.doctrine.factory');
15021503
} else {
15031504
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
15041505
if (!$this->isConfigEnabled($container, $serializerConfig)) {

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
4.3.0
5+
-----
6+
7+
* Add a `Doctrine` transport using `doctrine://<connection_name>` DSN
8+
49
4.2.0
510
-----
611

+77Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\Statement;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
18+
19+
class ConnectionTest extends TestCase
20+
{
21+
public function testGetAMessageWillChangeItsStatus()
22+
{
23+
$stmt = $this->getMockBuilder(Statement::class)
24+
->disableOriginalConstructor()
25+
->getMock();
26+
$stmt->expects($this->once())
27+
->method('execute');
28+
$stmt->expects($this->once())
29+
->method('fetch')
30+
->willReturn(array(
31+
'id' => 1,
32+
'body' => '{"message":"Hi"}',
33+
'headers' => \json_encode(array('type' => DummyMessage::class))
34+
));
35+
36+
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
37+
->disableOriginalConstructor()
38+
->getMock();
39+
$driverConnection->expects($this->once())
40+
->method('prepare')
41+
->willReturn($stmt);
42+
$driverConnection->expects($this->once())
43+
->method('update')
44+
->with('messages', array('status' => Connection::STATUS_PROCESSING), array('id' => 1));
45+
46+
$connection = new Connection('messages', $driverConnection);
47+
$doctrineEnvelop = $connection->get();
48+
$this->assertEquals(1, $doctrineEnvelop['id']);
49+
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelop['body']);
50+
$this->assertEquals(array('type' => DummyMessage::class), $doctrineEnvelop['headers']);
51+
}
52+
53+
public function testGetWithNoPendingMessageWillReturnNull()
54+
{
55+
$stmt = $this->getMockBuilder(Statement::class)
56+
->disableOriginalConstructor()
57+
->getMock();
58+
$stmt->expects($this->once())
59+
->method('execute');
60+
$stmt->expects($this->once())
61+
->method('fetch')
62+
->willReturn(false);
63+
64+
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
65+
->disableOriginalConstructor()
66+
->getMock();
67+
$driverConnection->expects($this->once())
68+
->method('prepare')
69+
->willReturn($stmt);
70+
$driverConnection->expects($this->never())
71+
->method('update');
72+
73+
$connection = new Connection('messages', $driverConnection);
74+
$doctrineEnvelop = $connection->get();
75+
$this->assertNull($doctrineEnvelop);
76+
}
77+
}
+42Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use Doctrine\DBAL\DriverManager;
15+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
17+
use PHPUnit\Framework\TestCase;
18+
19+
/**
20+
* @requires pdo_mysql
21+
*/
22+
class DoctrineIntegrationTest extends TestCase
23+
{
24+
protected function setUp()
25+
{
26+
parent::setUp();
27+
28+
if (!getenv('MESSENGER_DOCTRINE_DSN')) {
29+
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.');
30+
}
31+
}
32+
33+
public function testConnectionPublishAndGet()
34+
{
35+
$driverConnection = DriverManager::getConnection(array('url' => getenv('MESSENGER_DOCTRINE_DSN')));
36+
$connection = new Connection('messages', $driverConnection, true);
37+
$connection->publish('{"message": "Hi"}', array('type' => DummyMessage::class));
38+
$encoded = $connection->get();
39+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
40+
$this->assertEquals(array('type' => DummyMessage::class), $encoded['headers']);
41+
}
42+
}
+85Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
18+
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver;
19+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
20+
use Symfony\Component\Serializer as SerializerComponent;
21+
use Symfony\Component\Serializer\Encoder\JsonEncoder;
22+
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
23+
24+
class DoctrineReceiverTest extends TestCase
25+
{
26+
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
27+
{
28+
$serializer = new Serializer(
29+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
30+
);
31+
32+
$connection = $this->getMockBuilder(Connection::class)
33+
->disableOriginalConstructor()
34+
->getMock();
35+
$connection->method('get')
36+
->willReturn(array(
37+
'id' => 'dummy_id',
38+
'body' => '{"message": "Hi"}',
39+
'headers' => array(
40+
'type' => DummyMessage::class,
41+
)
42+
));
43+
44+
$connection->expects($this->once())->method('ack')->with('dummy_id');
45+
46+
$receiver = new DoctrineReceiver($connection, $serializer);
47+
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
48+
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
49+
$receiver->stop();
50+
});
51+
}
52+
53+
/**
54+
* @expectedException \Symfony\Component\Messenger\Tests\Transport\Doctrine\InterruptException
55+
*/
56+
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
57+
{
58+
$serializer = new Serializer(
59+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
60+
);
61+
62+
$connection = $this->getMockBuilder(Connection::class)
63+
->disableOriginalConstructor()
64+
->getMock();
65+
$connection->method('get')
66+
->willReturn(array(
67+
'id' => 'dummy_id',
68+
'body' => '{"message": "Hi"}',
69+
'headers' => array(
70+
'type' => DummyMessage::class,
71+
)
72+
));
73+
74+
$connection->expects($this->once())->method('nack')->with('dummy_id');
75+
76+
$receiver = new DoctrineReceiver($connection, $serializer);
77+
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
78+
throw new InterruptException();
79+
});
80+
}
81+
}
82+
83+
class InterruptException extends \Exception
84+
{
85+
}
+39Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
18+
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender;
19+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
20+
21+
class DoctrineSenderTest extends TestCase
22+
{
23+
public function testSend()
24+
{
25+
$envelope = new Envelope(new DummyMessage('Oy'));
26+
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
27+
28+
$connection = $this->getMockBuilder(Connection::class)
29+
->disableOriginalConstructor()
30+
->getMock();
31+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
32+
33+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
34+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
35+
36+
$sender = new DoctrineSender($connection, $serializer);
37+
$sender->send($envelope);
38+
}
39+
}
+77Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Bridge\Doctrine\RegistryInterface;
16+
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
17+
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport;
18+
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory;
19+
20+
class DoctrineTransportFactoryTest extends TestCase
21+
{
22+
public function testSupports()
23+
{
24+
$factory = new DoctrineTransportFactory(
25+
$this->getMockBuilder(RegistryInterface::class)->getMock(),
26+
null,
27+
false
28+
);
29+
30+
$this->assertTrue($factory->supports('doctrine://default', array()));
31+
$this->assertFalse($factory->supports('amqp://localhost', array()));
32+
}
33+
34+
public function testCreateTransport()
35+
{
36+
$connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
37+
->disableOriginalConstructor()
38+
->getMock();
39+
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
40+
$registry->expects($this->once())
41+
->method('getConnection')
42+
->willReturn($connection);
43+
44+
$factory = new DoctrineTransportFactory(
45+
$registry,
46+
null,
47+
false
48+
);
49+
50+
$this->assertEquals(
51+
new DoctrineTransport(new Connection('messenger_messages', $connection, false), null),
52+
$factory->createTransport('doctrine://default', array())
53+
);
54+
}
55+
56+
public function testCreateTransportWithCustomTableName()
57+
{
58+
$connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
59+
->disableOriginalConstructor()
60+
->getMock();
61+
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
62+
$registry->expects($this->once())
63+
->method('getConnection')
64+
->willReturn($connection);
65+
66+
$factory = new DoctrineTransportFactory(
67+
$registry,
68+
null,
69+
false
70+
);
71+
72+
$this->assertEquals(
73+
new DoctrineTransport(new Connection('custom_messages', $connection, false), null),
74+
$factory->createTransport('doctrine://default', array('table_name' => 'custom_messages'))
75+
);
76+
}
77+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.