Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ff0b855

Browse filesBrowse files
Refractor redis transport using redis streams
1 parent 7162d2e commit ff0b855
Copy full SHA for ff0b855

21 files changed

+392
-447
lines changed

‎.travis.yml

Copy file name to clipboardExpand all lines: .travis.yml
+9-4Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
2223

2324
matrix:
2425
include:
@@ -55,8 +56,8 @@ before_install:
5556
5657
- |
5758
# Start Redis cluster
58-
docker pull grokzen/redis-cluster:4.0.8
59-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
59+
docker pull grokzen/redis-cluster:5.0.4
60+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
6061
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6162
6263
- |
@@ -116,6 +117,7 @@ before_install:
116117
local ext_name=$1
117118
local ext_so=$2
118119
local INI=$3
120+
local input=${4:-yes}
119121
local ext_dir=$(php -r "echo ini_get('extension_dir');")
120122
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
121123
@@ -124,7 +126,7 @@ before_install:
124126
else
125127
rm ~/.pearrc /tmp/pear 2>/dev/null || true
126128
mkdir -p $ext_cache
127-
echo yes | pecl install -f $ext_name &&
129+
echo $input | pecl install -f $ext_name &&
128130
cp $ext_dir/$ext_so $ext_cache
129131
fi
130132
}
@@ -147,7 +149,6 @@ before_install:
147149
echo session.gc_probability = 0 >> $INI
148150
echo opcache.enable_cli = 1 >> $INI
149151
echo apc.enable_cli = 1 >> $INI
150-
echo extension = redis.so >> $INI
151152
echo extension = memcached.so >> $INI
152153
done
153154
@@ -166,7 +167,11 @@ before_install:
166167
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
167168
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
168169
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
170+
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
169171
done
172+
- |
173+
# List all php extensions with versions
174+
- php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'
170175

171176
- |
172177
# Load fixtures

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,6 +1700,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17001700
if (empty($config['transports'])) {
17011701
$container->removeDefinition('messenger.transport.symfony_serializer');
17021702
$container->removeDefinition('messenger.transport.amqp.factory');
1703+
$container->removeDefinition('messenger.transport.redis.factory');
17031704
} else {
17041705
$container->getDefinition('messenger.transport.symfony_serializer')
17051706
->replaceArgument(1, $config['serializer']['symfony_serializer']['format'])

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
<tag name="messenger.transport_factory" />
6767
</service>
6868

69+
<service id="messenger.transport.redis.factory" class="Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory">
70+
<tag name="messenger.transport_factory" />
71+
</service>
72+
6973
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
7074
<tag name="messenger.transport_factory" />
7175
</service>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
'options' => ['queue' => ['name' => 'Queue']],
1414
'serializer' => 'messenger.transport.native_php_serializer',
1515
],
16+
'redis' => 'redis://127.0.0.1:6379/messages',
1617
],
1718
],
1819
]);

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
</framework:queue>
1818
</framework:options>
1919
</framework:transport>
20+
<framework:transport name="redis" dsn="redis://127.0.0.1:6379/messages" />
2021
</framework:messenger>
2122
</framework:config>
2223
</container>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ framework:
1111
queue:
1212
name: Queue
1313
serializer: 'messenger.transport.native_php_serializer'
14+
redis: 'redis://127.0.0.1:6379/messages'

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+11Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ public function testMessenger()
673673
$this->assertTrue($container->hasAlias('messenger.default_bus'));
674674
$this->assertTrue($container->getAlias('messenger.default_bus')->isPublic());
675675
$this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory'));
676+
$this->assertFalse($container->hasDefinition('messenger.transport.redis.factory'));
676677
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
677678
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
678679
}
@@ -697,6 +698,16 @@ public function testMessengerTransports()
697698
$this->assertEquals(new Reference('messenger.transport.native_php_serializer'), $transportArguments[2]);
698699

699700
$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
701+
702+
$this->assertTrue($container->hasDefinition('messenger.transport.redis'));
703+
$transportFactory = $container->getDefinition('messenger.transport.redis')->getFactory();
704+
$transportArguments = $container->getDefinition('messenger.transport.redis')->getArguments();
705+
706+
$this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory);
707+
$this->assertCount(3, $transportArguments);
708+
$this->assertSame('redis://127.0.0.1:6379/messages', $transportArguments[0]);
709+
710+
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
700711
}
701712

702713
public function testMessengerRouting()

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+76-15Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,104 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\LogicException;
1516
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
1617

1718
/**
1819
* @requires extension redis
1920
*/
2021
class ConnectionTest extends TestCase
2122
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
23+
public function testFromInvalidDsn()
2724
{
25+
$this->expectException(\InvalidArgumentException::class);
26+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
27+
2828
Connection::fromDsn('redis://');
2929
}
3030

31-
public function testItGetsParametersFromTheDsn()
31+
public function testFromDsn()
3232
{
3333
$this->assertEquals(
34-
new Connection('queue', array(
34+
new Connection(['stream' => 'queue'], [
3535
'host' => 'localhost',
3636
'port' => 6379,
37-
)),
37+
]),
3838
Connection::fromDsn('redis://localhost/queue')
3939
);
4040
}
4141

42-
public function testOverrideOptionsViaQueryParameters()
42+
public function testFromDsnWithOptions()
4343
{
4444
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
45+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
46+
'host' => 'localhost',
4747
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
48+
], [
49+
'blocking_timeout' => 30,
50+
]),
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
5252
);
5353
}
54+
55+
public function testFromDsnWithQueryOptions()
56+
{
57+
$this->assertEquals(
58+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
59+
'host' => 'localhost',
60+
'port' => 6379,
61+
], [
62+
'blocking_timeout' => 30,
63+
]),
64+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
65+
);
66+
}
67+
68+
public function testKeepGettingPendingMessages()
69+
{
70+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
71+
72+
$redis->expects($this->exactly(3))->method('xreadgroup')
73+
->with('symfony', 'consumer', ['queue' => 0], 1, null)
74+
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
75+
76+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
77+
$this->assertNotNull($connection->get());
78+
$this->assertNotNull($connection->get());
79+
$this->assertNotNull($connection->get());
80+
}
81+
82+
public function testFirstGetPendingMessagesThenNewMessages()
83+
{
84+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
85+
86+
$count = 0;
87+
88+
$redis->expects($this->exactly(2))->method('xreadgroup')
89+
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
90+
++$count;
91+
92+
if (1 === $count) {
93+
return '0' === $arr_streams['queue'];
94+
}
95+
96+
return '>' === $arr_streams['queue'];
97+
}), 1, null)
98+
->willReturn(['queue' => []]);
99+
100+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
101+
$connection->get();
102+
}
103+
104+
public function testUnexpectedRedisError()
105+
{
106+
$this->expectException(LogicException::class);
107+
$this->expectExceptionMessage('Redis error happens');
108+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
109+
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
110+
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
111+
112+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
113+
$connection->get();
114+
}
54115
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php
-43Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.