Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4461d36

Browse filesBrowse files
Refractor using redis streams
1 parent 7162d2e commit 4461d36
Copy full SHA for 4461d36

16 files changed

+377
-447
lines changed

‎.travis.yml

Copy file name to clipboardExpand all lines: .travis.yml
+9-4Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
2223

2324
matrix:
2425
include:
@@ -55,8 +56,8 @@ before_install:
5556
5657
- |
5758
# Start Redis cluster
58-
docker pull grokzen/redis-cluster:4.0.8
59-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
59+
docker pull grokzen/redis-cluster:5.0.4
60+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
6061
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6162
6263
- |
@@ -116,6 +117,7 @@ before_install:
116117
local ext_name=$1
117118
local ext_so=$2
118119
local INI=$3
120+
local input=${4:-yes}
119121
local ext_dir=$(php -r "echo ini_get('extension_dir');")
120122
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
121123
@@ -124,7 +126,7 @@ before_install:
124126
else
125127
rm ~/.pearrc /tmp/pear 2>/dev/null || true
126128
mkdir -p $ext_cache
127-
echo yes | pecl install -f $ext_name &&
129+
echo $input | pecl install -f $ext_name &&
128130
cp $ext_dir/$ext_so $ext_cache
129131
fi
130132
}
@@ -147,7 +149,6 @@ before_install:
147149
echo session.gc_probability = 0 >> $INI
148150
echo opcache.enable_cli = 1 >> $INI
149151
echo apc.enable_cli = 1 >> $INI
150-
echo extension = redis.so >> $INI
151152
echo extension = memcached.so >> $INI
152153
done
153154
@@ -166,7 +167,11 @@ before_install:
166167
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
167168
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
168169
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
170+
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
169171
done
172+
- |
173+
# List all php extensions with versions
174+
- php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'
170175

171176
- |
172177
# Load fixtures

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
<tag name="messenger.transport_factory" />
6767
</service>
6868

69+
<service id="messenger.transport.redis.factory" class="Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory">
70+
<tag name="messenger.transport_factory" />
71+
</service>
72+
6973
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
7074
<tag name="messenger.transport_factory" />
7175
</service>

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+76-15Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,104 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\LogicException;
1516
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
1617

1718
/**
1819
* @requires extension redis
1920
*/
2021
class ConnectionTest extends TestCase
2122
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
23+
public function testFromInvalidDsn()
2724
{
25+
$this->expectException(\InvalidArgumentException::class);
26+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
27+
2828
Connection::fromDsn('redis://');
2929
}
3030

31-
public function testItGetsParametersFromTheDsn()
31+
public function testFromDsn()
3232
{
3333
$this->assertEquals(
34-
new Connection('queue', array(
34+
new Connection(['stream' => 'queue'], [
3535
'host' => 'localhost',
3636
'port' => 6379,
37-
)),
37+
]),
3838
Connection::fromDsn('redis://localhost/queue')
3939
);
4040
}
4141

42-
public function testOverrideOptionsViaQueryParameters()
42+
public function testFromDsnWithOptions()
4343
{
4444
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
45+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
46+
'host' => 'localhost',
4747
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
48+
], [
49+
'blocking_timeout' => 30,
50+
]),
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
5252
);
5353
}
54+
55+
public function testFromDsnWithQueryOptions()
56+
{
57+
$this->assertEquals(
58+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
59+
'host' => 'localhost',
60+
'port' => 6379,
61+
], [
62+
'blocking_timeout' => 30,
63+
]),
64+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
65+
);
66+
}
67+
68+
public function testKeepGettingPendingMessages()
69+
{
70+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
71+
72+
$redis->expects($this->exactly(3))->method('xreadgroup')
73+
->with('symfony', 'consumer', ['queue' => 0], 1, null)
74+
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
75+
76+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
77+
$this->assertNotNull($connection->get());
78+
$this->assertNotNull($connection->get());
79+
$this->assertNotNull($connection->get());
80+
}
81+
82+
public function testFirstGetPendingMessagesThenNewMessages()
83+
{
84+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
85+
86+
$count = 0;
87+
88+
$redis->expects($this->exactly(2))->method('xreadgroup')
89+
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
90+
++$count;
91+
92+
if (1 === $count) {
93+
return '0' === $arr_streams['queue'];
94+
}
95+
96+
return '>' === $arr_streams['queue'];
97+
}), 1, null)
98+
->willReturn(['queue' => []]);
99+
100+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
101+
$connection->get();
102+
}
103+
104+
public function testUnexpectedRedisError()
105+
{
106+
$this->expectException(LogicException::class);
107+
$this->expectExceptionMessage('Redis error happens');
108+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
109+
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
110+
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
111+
112+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
113+
$connection->get();
114+
}
54115
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php
-43Lines changed: 0 additions & 43 deletions
This file was deleted.

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
+26-107Lines changed: 26 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -12,135 +12,54 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15-
use Symfony\Component\Messenger\Envelope;
1615
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1716
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18-
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19-
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
20-
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21-
use Symfony\Component\Process\PhpProcess;
22-
use Symfony\Component\Process\Process;
23-
use Symfony\Component\Serializer as SerializerComponent;
24-
use Symfony\Component\Serializer\Encoder\JsonEncoder;
25-
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
2617

2718
/**
2819
* @requires extension redis
2920
*/
3021
class RedisExtIntegrationTest extends TestCase
3122
{
23+
private $redis;
24+
private $connection;
25+
3226
protected function setUp()
3327
{
34-
parent::setUp();
35-
3628
if (!getenv('MESSENGER_REDIS_DSN')) {
3729
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
3830
}
39-
}
40-
41-
public function testItSendsAndReceivesMessages()
42-
{
43-
$serializer = new Serializer(
44-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
45-
);
46-
47-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
48-
49-
$sender = new RedisSender($connection, $serializer);
50-
$receiver = new RedisReceiver($connection, $serializer);
5131

52-
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
53-
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
54-
55-
$receivedMessages = 0;
56-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
57-
$this->assertEquals(0 == $receivedMessages ? $first : $second, $envelope);
58-
59-
if (2 === ++$receivedMessages) {
60-
$receiver->stop();
61-
}
62-
});
32+
$this->redis = new \Redis();
33+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
34+
$this->clearRedis();
35+
$this->connection->setup();
6336
}
6437

65-
public function testItReceivesSignals()
38+
public function testConnectionSendAndGet()
6639
{
67-
$serializer = new Serializer(
68-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
69-
);
70-
71-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
72-
73-
$sender = new RedisSender($connection, $serializer);
74-
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
75-
76-
$amqpReadTimeout = 30;
77-
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
78-
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
79-
'COMPONENT_ROOT' => __DIR__.'/../../../',
80-
'DSN' => $dsn,
81-
));
82-
83-
$process->start();
84-
85-
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
86-
87-
$signalTime = microtime(true);
88-
$timedOutTime = time() + 10;
89-
90-
$process->signal(15);
91-
92-
while ($process->isRunning() && time() < $timedOutTime) {
93-
usleep(100 * 1000); // 100ms
94-
}
95-
96-
$this->assertFalse($process->isRunning());
97-
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
98-
$this->assertSame($expectedOutput.<<<'TXT'
99-
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
100-
with items: [
101-
"Symfony\\Component\\Messenger\\Asynchronous\\Transport\\ReceivedMessage"
102-
]
103-
Done.
104-
105-
TXT
106-
, $process->getOutput());
40+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
41+
$encoded = $this->connection->get();
42+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
43+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
10744
}
10845

109-
/**
110-
* @runInSeparateProcess
111-
*/
112-
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
46+
public function testGetTheFirstAvailableMessage()
11347
{
114-
$serializer = new Serializer(
115-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
116-
);
117-
118-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
119-
120-
$receiver = new RedisReceiver($connection, $serializer);
121-
122-
$receivedMessages = 0;
123-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
124-
$this->assertNull($envelope);
125-
126-
if (2 === ++$receivedMessages) {
127-
$receiver->stop();
128-
}
129-
});
48+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
49+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
50+
$encoded = $this->connection->get();
51+
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
52+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
53+
$encoded = $this->connection->get();
54+
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
55+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
13056
}
13157

132-
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
58+
private function clearRedis()
13359
{
134-
$timedOutTime = time() + $timeoutInSeconds;
135-
136-
while (time() < $timedOutTime) {
137-
if (0 === strpos($process->getOutput(), $output)) {
138-
return;
139-
}
140-
141-
usleep(100 * 1000); // 100ms
142-
}
143-
144-
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
60+
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
61+
$pathParts = explode('/', $parsedUrl['path'] ?? '');
62+
$stream = $pathParts[1] ?? 'symfony';
63+
$this->redis->del($stream);
14564
}
14665
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.