Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 781ae29

Browse filesBrowse files
Refractor using redis streams
1 parent 5851bf0 commit 781ae29
Copy full SHA for 781ae29

15 files changed

+292
-446
lines changed

‎.travis.yml

Copy file name to clipboardExpand all lines: .travis.yml
+4-3Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://localhost/messages
2223

2324
matrix:
2425
include:
@@ -53,8 +54,8 @@ before_install:
5354
5455
- |
5556
# Start Redis cluster
56-
docker pull grokzen/redis-cluster:4.0.8
57-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
57+
docker pull grokzen/redis-cluster:5.0.4
58+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
5859
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
5960
6061
- |
@@ -145,7 +146,6 @@ before_install:
145146
echo session.gc_probability = 0 >> $INI
146147
echo opcache.enable_cli = 1 >> $INI
147148
echo apc.enable_cli = 1 >> $INI
148-
echo extension = redis.so >> $INI
149149
echo extension = memcached.so >> $INI
150150
done
151151
@@ -161,6 +161,7 @@ before_install:
161161
162162
tfold ext.apcu tpecl apcu-5.1.16 apcu.so $INI
163163
tfold ext.mongodb tpecl mongodb-1.6.0alpha1 mongodb.so $INI
164+
tfold ext.redis tpecl redis-4.2.0 redis.so $INI
164165
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
165166
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
166167
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+27-15Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,48 @@
1919
*/
2020
class ConnectionTest extends TestCase
2121
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
22+
public function testFromInvalidDns()
2723
{
24+
$this->expectException(\InvalidArgumentException::class);
25+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
26+
2827
Connection::fromDsn('redis://');
2928
}
3029

31-
public function testItGetsParametersFromTheDsn()
30+
public function testFromDns()
3231
{
3332
$this->assertEquals(
34-
new Connection('queue', array(
33+
new Connection(['stream' => 'queue'], [
3534
'host' => 'localhost',
3635
'port' => 6379,
37-
)),
36+
]),
3837
Connection::fromDsn('redis://localhost/queue')
3938
);
4039
}
4140

42-
public function testOverrideOptionsViaQueryParameters()
41+
public function testFromDnsWithOptions()
4342
{
4443
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
44+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
45+
'host' => 'localhost',
46+
'port' => 6379,
47+
], [
48+
'blocking_timeout' => 30,
49+
]),
50+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
51+
);
52+
}
53+
54+
public function testFromDnsWithQueryOptions()
55+
{
56+
$this->assertEquals(
57+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
58+
'host' => 'localhost',
4759
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
60+
], [
61+
'blocking_timeout' => 30,
62+
]),
63+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
5264
);
5365
}
5466
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php
-43Lines changed: 0 additions & 43 deletions
This file was deleted.

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
+21-107Lines changed: 21 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -12,135 +12,49 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15-
use Symfony\Component\Messenger\Envelope;
1615
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1716
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18-
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19-
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
20-
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21-
use Symfony\Component\Process\PhpProcess;
22-
use Symfony\Component\Process\Process;
23-
use Symfony\Component\Serializer as SerializerComponent;
24-
use Symfony\Component\Serializer\Encoder\JsonEncoder;
25-
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
2617

2718
/**
2819
* @requires extension redis
2920
*/
3021
class RedisExtIntegrationTest extends TestCase
3122
{
23+
private $connection;
24+
3225
protected function setUp()
3326
{
34-
parent::setUp();
35-
3627
if (!getenv('MESSENGER_REDIS_DSN')) {
3728
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
3829
}
39-
}
40-
41-
public function testItSendsAndReceivesMessages()
42-
{
43-
$serializer = new Serializer(
44-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
45-
);
46-
47-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
48-
49-
$sender = new RedisSender($connection, $serializer);
50-
$receiver = new RedisReceiver($connection, $serializer);
5130

52-
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
53-
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
54-
55-
$receivedMessages = 0;
56-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
57-
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
58-
59-
if (2 === ++$receivedMessages) {
60-
$receiver->stop();
61-
}
62-
});
31+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
32+
$this->clearRedis();
33+
$this->connection->setup();
6334
}
6435

65-
public function testItReceivesSignals()
36+
public function testConnectionSendAndGet()
6637
{
67-
$serializer = new Serializer(
68-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
69-
);
70-
71-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
72-
73-
$sender = new RedisSender($connection, $serializer);
74-
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
75-
76-
$amqpReadTimeout = 30;
77-
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
78-
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
79-
'COMPONENT_ROOT' => __DIR__.'/../../../',
80-
'DSN' => $dsn,
81-
));
82-
83-
$process->start();
84-
85-
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
86-
87-
$signalTime = microtime(true);
88-
$timedOutTime = time() + 10;
89-
90-
$process->signal(15);
91-
92-
while ($process->isRunning() && time() < $timedOutTime) {
93-
usleep(100 * 1000); // 100ms
94-
}
95-
96-
$this->assertFalse($process->isRunning());
97-
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
98-
$this->assertSame($expectedOutput.<<<'TXT'
99-
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
100-
with items: [
101-
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
102-
]
103-
Done.
104-
105-
TXT
106-
, $process->getOutput());
38+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
39+
$encoded = $this->connection->get();
40+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
41+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
10742
}
10843

109-
/**
110-
* @runInSeparateProcess
111-
*/
112-
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
44+
public function testGetTheFirstAvailableMessage()
11345
{
114-
$serializer = new Serializer(
115-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
116-
);
117-
118-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
119-
120-
$receiver = new RedisReceiver($connection, $serializer);
121-
122-
$receivedMessages = 0;
123-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
124-
$this->assertNull($envelope);
125-
126-
if (2 === ++$receivedMessages) {
127-
$receiver->stop();
128-
}
129-
});
46+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
47+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
48+
$encoded = $this->connection->get();
49+
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
50+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
51+
$encoded = $this->connection->get();
52+
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
53+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
13054
}
13155

132-
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
56+
private function clearRedis()
13357
{
134-
$timedOutTime = time() + $timeoutInSeconds;
135-
136-
while (time() < $timedOutTime) {
137-
if (0 === strpos($process->getOutput(), $output)) {
138-
return;
139-
}
140-
141-
usleep(100 * 1000); // 100ms
142-
}
143-
144-
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
58+
// TODO the redis stream should be cleared before running the test
14559
}
14660
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.