Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 52d3d68

Browse filesBrowse files
Refractor using redis streams
1 parent 5851bf0 commit 52d3d68
Copy full SHA for 52d3d68

15 files changed

+336
-447
lines changed

‎.travis.yml

Copy file name to clipboardExpand all lines: .travis.yml
+6-4Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://localhost/messages
2223

2324
matrix:
2425
include:
@@ -53,8 +54,8 @@ before_install:
5354
5455
- |
5556
# Start Redis cluster
56-
docker pull grokzen/redis-cluster:4.0.8
57-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
57+
docker pull grokzen/redis-cluster:5.0.4
58+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
5859
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
5960
6061
- |
@@ -114,6 +115,7 @@ before_install:
114115
local ext_name=$1
115116
local ext_so=$2
116117
local INI=$3
118+
local input=${4:-yes}
117119
local ext_dir=$(php -r "echo ini_get('extension_dir');")
118120
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
119121
@@ -122,7 +124,7 @@ before_install:
122124
else
123125
rm ~/.pearrc /tmp/pear 2>/dev/null || true
124126
mkdir -p $ext_cache
125-
echo yes | pecl install -f $ext_name &&
127+
echo $input | pecl install -f $ext_name &&
126128
cp $ext_dir/$ext_so $ext_cache
127129
fi
128130
}
@@ -145,7 +147,6 @@ before_install:
145147
echo session.gc_probability = 0 >> $INI
146148
echo opcache.enable_cli = 1 >> $INI
147149
echo apc.enable_cli = 1 >> $INI
148-
echo extension = redis.so >> $INI
149150
echo extension = memcached.so >> $INI
150151
done
151152
@@ -164,6 +165,7 @@ before_install:
164165
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
165166
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
166167
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
168+
tfold ext.redis tpecl redis-4.2.0 redis.so $INI "no\nno"
167169
done
168170
169171
- |

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+49-15Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,70 @@
1919
*/
2020
class ConnectionTest extends TestCase
2121
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
22+
public function testFromInvalidDns()
2723
{
24+
$this->expectException(\InvalidArgumentException::class);
25+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
26+
2827
Connection::fromDsn('redis://');
2928
}
3029

31-
public function testItGetsParametersFromTheDsn()
30+
public function testFromDns()
3231
{
3332
$this->assertEquals(
34-
new Connection('queue', array(
33+
new Connection(['stream' => 'queue'], [
3534
'host' => 'localhost',
3635
'port' => 6379,
37-
)),
36+
]),
3837
Connection::fromDsn('redis://localhost/queue')
3938
);
4039
}
4140

42-
public function testOverrideOptionsViaQueryParameters()
41+
public function testFromDnsWithOptions()
4342
{
4443
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
44+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
45+
'host' => 'localhost',
46+
'port' => 6379,
47+
], [
48+
'blocking_timeout' => 30,
49+
]),
50+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
51+
);
52+
}
53+
54+
public function testFromDnsWithQueryOptions()
55+
{
56+
$this->assertEquals(
57+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
58+
'host' => 'localhost',
4759
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
60+
], [
61+
'blocking_timeout' => 30,
62+
]),
63+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
5264
);
5365
}
66+
67+
public function testFirstGetPendingMessages()
68+
{
69+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
70+
71+
$isFirst = true;
72+
73+
$redis->expects($this->exactly(2))->method('xreadgroup')
74+
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$isFirst) {
75+
if ($isFirst) {
76+
$isFirst = false;
77+
78+
return '0' === $arr_streams['queue'];
79+
}
80+
81+
return '>' === $arr_streams['queue'];
82+
}), 1, null)
83+
->willReturn(['queue' => []]);
84+
85+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
86+
$connection->get();
87+
}
5488
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php
-43Lines changed: 0 additions & 43 deletions
This file was deleted.

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
+26-107Lines changed: 26 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -12,135 +12,54 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15-
use Symfony\Component\Messenger\Envelope;
1615
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1716
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18-
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19-
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
20-
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21-
use Symfony\Component\Process\PhpProcess;
22-
use Symfony\Component\Process\Process;
23-
use Symfony\Component\Serializer as SerializerComponent;
24-
use Symfony\Component\Serializer\Encoder\JsonEncoder;
25-
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
2617

2718
/**
2819
* @requires extension redis
2920
*/
3021
class RedisExtIntegrationTest extends TestCase
3122
{
23+
private $redis;
24+
private $connection;
25+
3226
protected function setUp()
3327
{
34-
parent::setUp();
35-
3628
if (!getenv('MESSENGER_REDIS_DSN')) {
3729
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
3830
}
39-
}
40-
41-
public function testItSendsAndReceivesMessages()
42-
{
43-
$serializer = new Serializer(
44-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
45-
);
46-
47-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
48-
49-
$sender = new RedisSender($connection, $serializer);
50-
$receiver = new RedisReceiver($connection, $serializer);
5131

52-
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
53-
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
54-
55-
$receivedMessages = 0;
56-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
57-
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
58-
59-
if (2 === ++$receivedMessages) {
60-
$receiver->stop();
61-
}
62-
});
32+
$this->redis = new \Redis();
33+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
34+
$this->clearRedis();
35+
$this->connection->setup();
6336
}
6437

65-
public function testItReceivesSignals()
38+
public function testConnectionSendAndGet()
6639
{
67-
$serializer = new Serializer(
68-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
69-
);
70-
71-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
72-
73-
$sender = new RedisSender($connection, $serializer);
74-
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
75-
76-
$amqpReadTimeout = 30;
77-
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
78-
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
79-
'COMPONENT_ROOT' => __DIR__.'/../../../',
80-
'DSN' => $dsn,
81-
));
82-
83-
$process->start();
84-
85-
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
86-
87-
$signalTime = microtime(true);
88-
$timedOutTime = time() + 10;
89-
90-
$process->signal(15);
91-
92-
while ($process->isRunning() && time() < $timedOutTime) {
93-
usleep(100 * 1000); // 100ms
94-
}
95-
96-
$this->assertFalse($process->isRunning());
97-
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
98-
$this->assertSame($expectedOutput.<<<'TXT'
99-
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
100-
with items: [
101-
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
102-
]
103-
Done.
104-
105-
TXT
106-
, $process->getOutput());
40+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
41+
$encoded = $this->connection->get();
42+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
43+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
10744
}
10845

109-
/**
110-
* @runInSeparateProcess
111-
*/
112-
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
46+
public function testGetTheFirstAvailableMessage()
11347
{
114-
$serializer = new Serializer(
115-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
116-
);
117-
118-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
119-
120-
$receiver = new RedisReceiver($connection, $serializer);
121-
122-
$receivedMessages = 0;
123-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
124-
$this->assertNull($envelope);
125-
126-
if (2 === ++$receivedMessages) {
127-
$receiver->stop();
128-
}
129-
});
48+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
49+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
50+
$encoded = $this->connection->get();
51+
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
52+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
53+
$encoded = $this->connection->get();
54+
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
55+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
13056
}
13157

132-
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
58+
private function clearRedis()
13359
{
134-
$timedOutTime = time() + $timeoutInSeconds;
135-
136-
while (time() < $timedOutTime) {
137-
if (0 === strpos($process->getOutput(), $output)) {
138-
return;
139-
}
140-
141-
usleep(100 * 1000); // 100ms
142-
}
143-
144-
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
60+
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
61+
$pathParts = explode('/', $parsedUrl['path'] ?? '');
62+
$stream = $pathParts[1] ?? 'symfony';
63+
$this->redis->del($stream);
14564
}
14665
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.