Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Add a redis stream transport #30917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions 13 .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages

matrix:
include:
Expand Down Expand Up @@ -55,8 +56,8 @@ before_install:

- |
# Start Redis cluster
docker pull grokzen/redis-cluster:4.0.8
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
docker pull grokzen/redis-cluster:5.0.4
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'

- |
Expand Down Expand Up @@ -116,6 +117,7 @@ before_install:
local ext_name=$1
local ext_so=$2
local INI=$3
local input=${4:-yes}
local ext_dir=$(php -r "echo ini_get('extension_dir');")
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name

Expand All @@ -124,7 +126,7 @@ before_install:
else
rm ~/.pearrc /tmp/pear 2>/dev/null || true
mkdir -p $ext_cache
echo yes | pecl install -f $ext_name &&
echo $input | pecl install -f $ext_name &&
cp $ext_dir/$ext_so $ext_cache
fi
}
Expand All @@ -147,7 +149,6 @@ before_install:
echo session.gc_probability = 0 >> $INI
echo opcache.enable_cli = 1 >> $INI
echo apc.enable_cli = 1 >> $INI
echo extension = redis.so >> $INI
echo extension = memcached.so >> $INI
done

Expand All @@ -166,7 +167,11 @@ before_install:
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
done
- |
# List all php extensions with versions
- php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'

- |
# Load fixtures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (empty($config['transports'])) {
$container->removeDefinition('messenger.transport.symfony_serializer');
$container->removeDefinition('messenger.transport.amqp.factory');
$container->removeDefinition('messenger.transport.redis.factory');
} else {
$container->getDefinition('messenger.transport.symfony_serializer')
->replaceArgument(1, $config['serializer']['symfony_serializer']['format'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
<tag name="messenger.transport_factory" />
</service>

<service id="messenger.transport.redis.factory" class="Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory">
<tag name="messenger.transport_factory" />
</service>

<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
<tag name="messenger.transport_factory" />
</service>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
'options' => ['queue' => ['name' => 'Queue']],
'serializer' => 'messenger.transport.native_php_serializer',
],
'redis' => 'redis://127.0.0.1:6379/messages',
],
],
]);
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
</framework:queue>
</framework:options>
</framework:transport>
<framework:transport name="redis" dsn="redis://127.0.0.1:6379/messages" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ framework:
queue:
name: Queue
serializer: 'messenger.transport.native_php_serializer'
redis: 'redis://127.0.0.1:6379/messages'
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ public function testMessenger()
$this->assertTrue($container->hasAlias('messenger.default_bus'));
$this->assertTrue($container->getAlias('messenger.default_bus')->isPublic());
$this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory'));
$this->assertFalse($container->hasDefinition('messenger.transport.redis.factory'));
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
}
Expand All @@ -697,6 +698,16 @@ public function testMessengerTransports()
$this->assertEquals(new Reference('messenger.transport.native_php_serializer'), $transportArguments[2]);

$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));

$this->assertTrue($container->hasDefinition('messenger.transport.redis'));
$transportFactory = $container->getDefinition('messenger.transport.redis')->getFactory();
$transportArguments = $container->getDefinition('messenger.transport.redis')->getArguments();

$this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory);
$this->assertCount(3, $transportArguments);
$this->assertSame('redis://127.0.0.1:6379/messages', $transportArguments[0]);

$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
}

public function testMessengerRouting()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;

/**
* @requires extension redis
*/
class ConnectionTest extends TestCase
alexander-schranz marked this conversation as resolved.
Show resolved Hide resolved
{
public function testFromInvalidDsn()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');

Connection::fromDsn('redis://');
}

public function testFromDsn()
{
$this->assertEquals(
new Connection(['stream' => 'queue'], [
'host' => 'localhost',
'port' => 6379,
]),
Connection::fromDsn('redis://localhost/queue')
);
}

public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
'host' => 'localhost',
'port' => 6379,
], [
'blocking_timeout' => 30,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
);
}

public function testFromDsnWithQueryOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
'host' => 'localhost',
'port' => 6379,
], [
'blocking_timeout' => 30,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
);
}

public function testKeepGettingPendingMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$redis->expects($this->exactly(3))->method('xreadgroup')
->with('symfony', 'consumer', ['queue' => 0], 1, null)
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
}

public function testFirstGetPendingMessagesThenNewMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$count = 0;

$redis->expects($this->exactly(2))->method('xreadgroup')
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
++$count;

if (1 === $count) {
return '0' === $arr_streams['queue'];
}

return '>' === $arr_streams['queue'];
}), 1, null)
->willReturn(['queue' => []]);

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}

public function testUnexpectedRedisError()
{
$this->expectException(LogicException::class);
$this->expectExceptionMessage('Redis error happens');
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;

/**
* @requires extension redis
*/
class RedisExtIntegrationTest extends TestCase
{
private $redis;
private $connection;

protected function setUp()
{
if (!getenv('MESSENGER_REDIS_DSN')) {
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
}

$this->redis = new \Redis();
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
$this->clearRedis();
$this->connection->setup();
}

public function testConnectionSendAndGet()
{
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

public function testGetTheFirstAvailableMessage()
{
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

private function clearRedis()
{
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? 'symfony';
$this->redis->del($stream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

class RedisReceiverTest extends TestCase
{
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = $this->createSerializer();

$redisEnvelop = $this->createRedisEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($redisEnvelop);

$receiver = new RedisReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}

public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
{
$this->expectException(MessageDecodingFailedException::class);

$serializer = $this->createMock(PhpSerializer::class);
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());

$redisEnvelop = $this->createRedisEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($redisEnvelop);
$connection->expects($this->once())->method('reject');

$receiver = new RedisReceiver($connection, $serializer);
iterator_to_array($receiver->get());
}

private function createRedisEnvelope()
{
return [
'id' => 1,
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
];
}

private function createSerializer(): Serializer
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

return $serializer;
}
}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.