Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 828f018

Browse filesBrowse files
soyukaalexander-schranz
authored andcommitted
Implement redis transport
1 parent 8977f74 commit 828f018
Copy full SHA for 828f018

13 files changed

+904
-0
lines changed
+54Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
16+
17+
/**
18+
* @requires extension redis
19+
*/
20+
class ConnectionTest extends TestCase
21+
{
22+
/**
23+
* @expectedException \InvalidArgumentException
24+
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25+
*/
26+
public function testItCannotBeConstructedWithAWrongDsn()
27+
{
28+
Connection::fromDsn('redis://');
29+
}
30+
31+
public function testItGetsParametersFromTheDsn()
32+
{
33+
$this->assertEquals(
34+
new Connection('queue', array(
35+
'host' => 'localhost',
36+
'port' => 6379,
37+
)),
38+
Connection::fromDsn('redis://localhost/queue')
39+
);
40+
}
41+
42+
public function testOverrideOptionsViaQueryParameters()
43+
{
44+
$this->assertEquals(
45+
new Connection('queue', array(
46+
'host' => '127.0.0.1',
47+
'port' => 6379,
48+
), array(
49+
'processing_ttl' => '8000',
50+
)),
51+
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
52+
);
53+
}
54+
}
+43Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
$componentRoot = $_SERVER['COMPONENT_ROOT'];
4+
5+
if (!is_file($autoload = $componentRoot.'/vendor/autoload.php')) {
6+
$autoload = $componentRoot.'/../../../../vendor/autoload.php';
7+
}
8+
9+
if (!file_exists($autoload)) {
10+
exit('You should run "composer install --dev" in the component before running this script.');
11+
}
12+
13+
require_once $autoload;
14+
15+
use Symfony\Component\Messenger\MessageBusInterface;
16+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
17+
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
18+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
19+
use Symfony\Component\Messenger\Worker;
20+
use Symfony\Component\Serializer as SerializerComponent;
21+
use Symfony\Component\Serializer\Encoder\JsonEncoder;
22+
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
23+
24+
$serializer = new Serializer(
25+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
26+
);
27+
28+
$connection = Connection::fromDsn(getenv('DSN'));
29+
$receiver = new RedisReceiver($connection, $serializer);
30+
31+
$worker = new Worker($receiver, new class() implements MessageBusInterface {
32+
public function dispatch($envelope)
33+
{
34+
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
35+
echo sprintf("with items: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
36+
37+
sleep(30);
38+
echo "Done.\n";
39+
}
40+
});
41+
42+
echo "Receiving messages...\n";
43+
$worker->run();
+146Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18+
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19+
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
20+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21+
use Symfony\Component\Process\PhpProcess;
22+
use Symfony\Component\Process\Process;
23+
use Symfony\Component\Serializer as SerializerComponent;
24+
use Symfony\Component\Serializer\Encoder\JsonEncoder;
25+
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
26+
27+
/**
28+
* @requires extension redis
29+
*/
30+
class RedisExtIntegrationTest extends TestCase
31+
{
32+
protected function setUp()
33+
{
34+
parent::setUp();
35+
36+
if (!getenv('MESSENGER_REDIS_DSN')) {
37+
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
38+
}
39+
}
40+
41+
public function testItSendsAndReceivesMessages()
42+
{
43+
$serializer = new Serializer(
44+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
45+
);
46+
47+
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
48+
49+
$sender = new RedisSender($connection, $serializer);
50+
$receiver = new RedisReceiver($connection, $serializer);
51+
52+
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
53+
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
54+
55+
$receivedMessages = 0;
56+
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
57+
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
58+
59+
if (2 === ++$receivedMessages) {
60+
$receiver->stop();
61+
}
62+
});
63+
}
64+
65+
public function testItReceivesSignals()
66+
{
67+
$serializer = new Serializer(
68+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
69+
);
70+
71+
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
72+
73+
$sender = new RedisSender($connection, $serializer);
74+
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
75+
76+
$amqpReadTimeout = 30;
77+
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
78+
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
79+
'COMPONENT_ROOT' => __DIR__.'/../../../',
80+
'DSN' => $dsn,
81+
));
82+
83+
$process->start();
84+
85+
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
86+
87+
$signalTime = microtime(true);
88+
$timedOutTime = time() + 10;
89+
90+
$process->signal(15);
91+
92+
while ($process->isRunning() && time() < $timedOutTime) {
93+
usleep(100 * 1000); // 100ms
94+
}
95+
96+
$this->assertFalse($process->isRunning());
97+
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
98+
$this->assertSame($expectedOutput.<<<'TXT'
99+
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
100+
with items: [
101+
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
102+
]
103+
Done.
104+
105+
TXT
106+
, $process->getOutput());
107+
}
108+
109+
/**
110+
* @runInSeparateProcess
111+
*/
112+
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
113+
{
114+
$serializer = new Serializer(
115+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
116+
);
117+
118+
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
119+
120+
$receiver = new RedisReceiver($connection, $serializer);
121+
122+
$receivedMessages = 0;
123+
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
124+
$this->assertNull($envelope);
125+
126+
if (2 === ++$receivedMessages) {
127+
$receiver->stop();
128+
}
129+
});
130+
}
131+
132+
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
133+
{
134+
$timedOutTime = time() + $timeoutInSeconds;
135+
136+
while (time() < $timedOutTime) {
137+
if (0 === strpos($process->getOutput(), $output)) {
138+
return;
139+
}
140+
141+
usleep(100 * 1000); // 100ms
142+
}
143+
144+
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
145+
}
146+
}
+118Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18+
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
19+
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
20+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21+
use Symfony\Component\Serializer as SerializerComponent;
22+
use Symfony\Component\Serializer\Encoder\JsonEncoder;
23+
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
24+
25+
/**
26+
* @requires extension redis
27+
*/
28+
class RedisReceiverTest extends TestCase
29+
{
30+
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
31+
{
32+
$serializer = new Serializer(
33+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
34+
);
35+
36+
$envelope = Envelope::wrap(new DummyMessage('Hi'));
37+
$encoded = $serializer->encode($envelope);
38+
39+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
40+
$connection->method('waitAndGet')->willReturn($encoded);
41+
42+
$connection->expects($this->once())->method('ack')->with($encoded);
43+
44+
$receiver = new RedisReceiver($connection, $serializer);
45+
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
46+
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
47+
$receiver->stop();
48+
});
49+
}
50+
51+
public function testItSendNoMessageToTheHandler()
52+
{
53+
$serializer = new Serializer(
54+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
55+
);
56+
57+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
58+
$connection->method('waitAndGet')->willReturn(null);
59+
60+
$receiver = new RedisReceiver($connection, $serializer);
61+
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
62+
$this->assertNull($envelope);
63+
$receiver->stop();
64+
});
65+
}
66+
67+
/**
68+
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\InterruptException
69+
*/
70+
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
71+
{
72+
$serializer = new Serializer(
73+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
74+
);
75+
76+
$envelope = Envelope::wrap(new DummyMessage('Hi'));
77+
$encoded = $serializer->encode($envelope);
78+
79+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
80+
$connection->method('waitAndGet')->willReturn($encoded);
81+
$connection->expects($this->once())->method('requeue')->with($encoded);
82+
83+
$receiver = new RedisReceiver($connection, $serializer);
84+
$receiver->receive(function () {
85+
throw new InterruptException('Well...');
86+
});
87+
}
88+
89+
/**
90+
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\WillNeverWorkException
91+
*/
92+
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
93+
{
94+
$serializer = new Serializer(
95+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
96+
);
97+
98+
$envelope = Envelope::wrap(new DummyMessage('Hi'));
99+
$encoded = $serializer->encode($envelope);
100+
101+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
102+
$connection->method('waitAndGet')->willReturn($encoded);
103+
$connection->expects($this->once())->method('reject')->with($encoded);
104+
105+
$receiver = new RedisReceiver($connection, $serializer);
106+
$receiver->receive(function () {
107+
throw new WillNeverWorkException('Well...');
108+
});
109+
}
110+
}
111+
112+
class InterruptException extends \Exception
113+
{
114+
}
115+
116+
class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
117+
{
118+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.