diff --git a/.travis.yml b/.travis.yml
index 9b87cfd5342f8..3cd322adfbde8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,6 +19,7 @@ env:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
+ - MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
matrix:
include:
@@ -55,8 +56,8 @@ before_install:
- |
# Start Redis cluster
- docker pull grokzen/redis-cluster:4.0.8
- docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
+ docker pull grokzen/redis-cluster:5.0.4
+ docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
- |
@@ -116,6 +117,7 @@ before_install:
local ext_name=$1
local ext_so=$2
local INI=$3
+ local input=${4:-yes}
local ext_dir=$(php -r "echo ini_get('extension_dir');")
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
@@ -124,7 +126,7 @@ before_install:
else
rm ~/.pearrc /tmp/pear 2>/dev/null || true
mkdir -p $ext_cache
- echo yes | pecl install -f $ext_name &&
+ echo $input | pecl install -f $ext_name &&
cp $ext_dir/$ext_so $ext_cache
fi
}
@@ -147,7 +149,6 @@ before_install:
echo session.gc_probability = 0 >> $INI
echo opcache.enable_cli = 1 >> $INI
echo apc.enable_cli = 1 >> $INI
- echo extension = redis.so >> $INI
echo extension = memcached.so >> $INI
done
@@ -166,7 +167,11 @@ before_install:
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
+ tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
done
+ - |
+ # List all php extensions with versions
+ - php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'
- |
# Load fixtures
diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index b1c42387ff4fb..cfcbe1ee17db1 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -1700,6 +1700,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (empty($config['transports'])) {
$container->removeDefinition('messenger.transport.symfony_serializer');
$container->removeDefinition('messenger.transport.amqp.factory');
+ $container->removeDefinition('messenger.transport.redis.factory');
} else {
$container->getDefinition('messenger.transport.symfony_serializer')
->replaceArgument(1, $config['serializer']['symfony_serializer']['format'])
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 4f677d40918a8..b0bcf2fd5ccbb 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -66,6 +66,10 @@
+
+
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
index b655644e0dff5..68ff3607465b2 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
@@ -13,6 +13,7 @@
'options' => ['queue' => ['name' => 'Queue']],
'serializer' => 'messenger.transport.native_php_serializer',
],
+ 'redis' => 'redis://127.0.0.1:6379/messages',
],
],
]);
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
index 411c0c29e5b50..bb698cbc17105 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
@@ -17,6 +17,7 @@
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
index 409e410986840..2fc1f482653e4 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
@@ -11,3 +11,4 @@ framework:
queue:
name: Queue
serializer: 'messenger.transport.native_php_serializer'
+ redis: 'redis://127.0.0.1:6379/messages'
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index c69275e438d15..64044e7450177 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -673,6 +673,7 @@ public function testMessenger()
$this->assertTrue($container->hasAlias('messenger.default_bus'));
$this->assertTrue($container->getAlias('messenger.default_bus')->isPublic());
$this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory'));
+ $this->assertFalse($container->hasDefinition('messenger.transport.redis.factory'));
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
}
@@ -697,6 +698,16 @@ public function testMessengerTransports()
$this->assertEquals(new Reference('messenger.transport.native_php_serializer'), $transportArguments[2]);
$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
+
+ $this->assertTrue($container->hasDefinition('messenger.transport.redis'));
+ $transportFactory = $container->getDefinition('messenger.transport.redis')->getFactory();
+ $transportArguments = $container->getDefinition('messenger.transport.redis')->getArguments();
+
+ $this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory);
+ $this->assertCount(3, $transportArguments);
+ $this->assertSame('redis://127.0.0.1:6379/messages', $transportArguments[0]);
+
+ $this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
}
public function testMessengerRouting()
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
new file mode 100644
index 0000000000000..96f2942050e97
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
@@ -0,0 +1,115 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Exception\LogicException;
+use Symfony\Component\Messenger\Transport\RedisExt\Connection;
+
+/**
+ * @requires extension redis
+ */
+class ConnectionTest extends TestCase
+{
+ public function testFromInvalidDsn()
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
+
+ Connection::fromDsn('redis://');
+ }
+
+ public function testFromDsn()
+ {
+ $this->assertEquals(
+ new Connection(['stream' => 'queue'], [
+ 'host' => 'localhost',
+ 'port' => 6379,
+ ]),
+ Connection::fromDsn('redis://localhost/queue')
+ );
+ }
+
+ public function testFromDsnWithOptions()
+ {
+ $this->assertEquals(
+ new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
+ 'host' => 'localhost',
+ 'port' => 6379,
+ ], [
+ 'blocking_timeout' => 30,
+ ]),
+ Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
+ );
+ }
+
+ public function testFromDsnWithQueryOptions()
+ {
+ $this->assertEquals(
+ new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
+ 'host' => 'localhost',
+ 'port' => 6379,
+ ], [
+ 'blocking_timeout' => 30,
+ ]),
+ Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
+ );
+ }
+
+ public function testKeepGettingPendingMessages()
+ {
+ $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
+
+ $redis->expects($this->exactly(3))->method('xreadgroup')
+ ->with('symfony', 'consumer', ['queue' => 0], 1, null)
+ ->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
+
+ $connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
+ $this->assertNotNull($connection->get());
+ $this->assertNotNull($connection->get());
+ $this->assertNotNull($connection->get());
+ }
+
+ public function testFirstGetPendingMessagesThenNewMessages()
+ {
+ $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
+
+ $count = 0;
+
+ $redis->expects($this->exactly(2))->method('xreadgroup')
+ ->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
+ ++$count;
+
+ if (1 === $count) {
+ return '0' === $arr_streams['queue'];
+ }
+
+ return '>' === $arr_streams['queue'];
+ }), 1, null)
+ ->willReturn(['queue' => []]);
+
+ $connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
+ $connection->get();
+ }
+
+ public function testUnexpectedRedisError()
+ {
+ $this->expectException(LogicException::class);
+ $this->expectExceptionMessage('Redis error happens');
+ $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
+ $redis->expects($this->once())->method('xreadgroup')->willReturn(false);
+ $redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
+
+ $connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
+ $connection->get();
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
new file mode 100644
index 0000000000000..5342250e843f5
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
@@ -0,0 +1,65 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\RedisExt\Connection;
+
+/**
+ * @requires extension redis
+ */
+class RedisExtIntegrationTest extends TestCase
+{
+ private $redis;
+ private $connection;
+
+ protected function setUp()
+ {
+ if (!getenv('MESSENGER_REDIS_DSN')) {
+ $this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
+ }
+
+ $this->redis = new \Redis();
+ $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
+ $this->clearRedis();
+ $this->connection->setup();
+ }
+
+ public function testConnectionSendAndGet()
+ {
+ $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
+ $encoded = $this->connection->get();
+ $this->assertEquals('{"message": "Hi"}', $encoded['body']);
+ $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
+ }
+
+ public function testGetTheFirstAvailableMessage()
+ {
+ $this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
+ $this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
+ $encoded = $this->connection->get();
+ $this->assertEquals('{"message": "Hi1"}', $encoded['body']);
+ $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
+ $encoded = $this->connection->get();
+ $this->assertEquals('{"message": "Hi2"}', $encoded['body']);
+ $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
+ }
+
+ private function clearRedis()
+ {
+ $parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
+ $pathParts = explode('/', $parsedUrl['path'] ?? '');
+ $stream = $pathParts[1] ?? 'symfony';
+ $this->redis->del($stream);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php
new file mode 100644
index 0000000000000..c3bb532239756
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php
@@ -0,0 +1,76 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\RedisExt\Connection;
+use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
+use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
+use Symfony\Component\Messenger\Transport\Serialization\Serializer;
+use Symfony\Component\Serializer as SerializerComponent;
+use Symfony\Component\Serializer\Encoder\JsonEncoder;
+use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
+
+class RedisReceiverTest extends TestCase
+{
+ public function testItReturnsTheDecodedMessageToTheHandler()
+ {
+ $serializer = $this->createSerializer();
+
+ $redisEnvelop = $this->createRedisEnvelope();
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+ $connection->method('get')->willReturn($redisEnvelop);
+
+ $receiver = new RedisReceiver($connection, $serializer);
+ $actualEnvelopes = iterator_to_array($receiver->get());
+ $this->assertCount(1, $actualEnvelopes);
+ $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
+ }
+
+ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
+ {
+ $this->expectException(MessageDecodingFailedException::class);
+
+ $serializer = $this->createMock(PhpSerializer::class);
+ $serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
+
+ $redisEnvelop = $this->createRedisEnvelope();
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+ $connection->method('get')->willReturn($redisEnvelop);
+ $connection->expects($this->once())->method('reject');
+
+ $receiver = new RedisReceiver($connection, $serializer);
+ iterator_to_array($receiver->get());
+ }
+
+ private function createRedisEnvelope()
+ {
+ return [
+ 'id' => 1,
+ 'body' => '{"message": "Hi"}',
+ 'headers' => [
+ 'type' => DummyMessage::class,
+ ],
+ ];
+ }
+
+ private function createSerializer(): Serializer
+ {
+ $serializer = new Serializer(
+ new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
+ );
+
+ return $serializer;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php
new file mode 100644
index 0000000000000..5cbda34e10b97
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php
@@ -0,0 +1,39 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\RedisExt\Connection;
+use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+
+class RedisSenderTest extends TestCase
+{
+ public function testSend()
+ {
+ $envelope = new Envelope(new DummyMessage('Oy'));
+ $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
+
+ $connection = $this->getMockBuilder(Connection::class)
+ ->disableOriginalConstructor()
+ ->getMock();
+ $connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers']);
+
+ $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
+ $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
+
+ $sender = new RedisSender($connection, $serializer);
+ $sender->send($envelope);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php
new file mode 100644
index 0000000000000..58b71536cf9d6
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php
@@ -0,0 +1,42 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Transport\RedisExt\Connection;
+use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport;
+use Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+
+/**
+ * @requires extension redis
+ */
+class RedisTransportFactoryTest extends TestCase
+{
+ public function testSupportsOnlyRedisTransports()
+ {
+ $factory = new RedisTransportFactory();
+
+ $this->assertTrue($factory->supports('redis://localhost', []));
+ $this->assertFalse($factory->supports('sqs://localhost', []));
+ $this->assertFalse($factory->supports('invalid-dsn', []));
+ }
+
+ public function testCreateTransport()
+ {
+ $factory = new RedisTransportFactory();
+ $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
+ $expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', ['foo' => 'bar']), $serializer);
+
+ $this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', ['foo' => 'bar'], $serializer));
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php
new file mode 100644
index 0000000000000..0c83e6be88c46
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php
@@ -0,0 +1,60 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\RedisExt\Connection;
+use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+class RedisTransportTest extends TestCase
+{
+ public function testItIsATransport()
+ {
+ $transport = $this->getTransport();
+
+ $this->assertInstanceOf(TransportInterface::class, $transport);
+ }
+
+ public function testReceivesMessages()
+ {
+ $transport = $this->getTransport(
+ $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
+ );
+
+ $decodedMessage = new DummyMessage('Decoded.');
+
+ $redisEnvelope = [
+ 'id' => '5',
+ 'body' => 'body',
+ 'headers' => ['my' => 'header'],
+ ];
+
+ $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
+ $connection->method('get')->willReturn($redisEnvelope);
+
+ $envelopes = iterator_to_array($transport->get());
+ $this->assertSame($decodedMessage, $envelopes[0]->getMessage());
+ }
+
+ private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
+ {
+ $serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
+ $connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+
+ return new RedisTransport($connection, $serializer);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
new file mode 100644
index 0000000000000..056159818ec01
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
@@ -0,0 +1,135 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\RedisExt;
+
+use Symfony\Component\Messenger\Exception\InvalidArgumentException;
+use Symfony\Component\Messenger\Exception\LogicException;
+
+/**
+ * A Redis connection.
+ *
+ * @author Alexander Schranz
+ * @author Antoine Bluchet
+ *
+ * @internal
+ * @final
+ *
+ * @experimental in 4.3
+ */
+class Connection
+{
+ private $connection;
+ private $stream;
+ private $group;
+ private $consumer;
+ private $blockingTimeout;
+ private $couldHavePendingMessages = true;
+
+ public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
+ {
+ $this->connection = $redis ?: new \Redis();
+ $this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
+ $this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
+ $this->stream = $configuration['stream'] ?? '' ?: 'messages';
+ $this->group = $configuration['group'] ?? '' ?: 'symfony';
+ $this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
+ $this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null;
+ }
+
+ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
+ {
+ if (false === $parsedUrl = parse_url($dsn)) {
+ throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
+ }
+
+ $pathParts = explode('/', $parsedUrl['path'] ?? '');
+
+ $stream = $pathParts[1] ?? '';
+ $group = $pathParts[2] ?? '';
+ $consumer = $pathParts[3] ?? '';
+
+ $connectionCredentials = [
+ 'host' => $parsedUrl['host'] ?? '127.0.0.1',
+ 'port' => $parsedUrl['port'] ?? 6379,
+ ];
+
+ if (isset($parsedUrl['query'])) {
+ parse_str($parsedUrl['query'], $redisOptions);
+ }
+
+ return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis);
+ }
+
+ public function get(): ?array
+ {
+ $messageId = '>'; // will receive new messages
+
+ if ($this->couldHavePendingMessages) {
+ $messageId = '0'; // will receive consumers pending messages
+ }
+
+ $messages = $this->connection->xreadgroup(
+ $this->group,
+ $this->consumer,
+ [$this->stream => $messageId],
+ 1,
+ $this->blockingTimeout
+ );
+
+ if (false === $messages) {
+ throw new LogicException(
+ $this->connection->getLastError() ?: 'Unexpected redis stream error happened.'
+ );
+ }
+
+ if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
+ $this->couldHavePendingMessages = false;
+
+ // No pending messages so get a new one
+ return $this->get();
+ }
+
+ foreach ($messages[$this->stream] as $key => $message) {
+ $redisEnvelope = \json_decode($message['message'], true);
+
+ return [
+ 'id' => $key,
+ 'body' => $redisEnvelope['body'],
+ 'headers' => $redisEnvelope['headers'],
+ ];
+ }
+
+ return null;
+ }
+
+ public function ack(string $id): void
+ {
+ $this->connection->xack($this->stream, $this->group, [$id]);
+ }
+
+ public function reject(string $id): void
+ {
+ $this->connection->xdel($this->stream, [$id]);
+ }
+
+ public function add(string $body, array $headers)
+ {
+ $this->connection->xadd($this->stream, '*', ['message' => json_encode(
+ ['body' => $body, 'headers' => $headers]
+ )]);
+ }
+
+ public function setup(): void
+ {
+ $this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceivedStamp.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceivedStamp.php
new file mode 100644
index 0000000000000..c0b6ad37bded9
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceivedStamp.php
@@ -0,0 +1,34 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\RedisExt;
+
+use Symfony\Component\Messenger\Stamp\StampInterface;
+
+/**
+ * @author Alexander Schranz
+ *
+ * @experimental in 4.3
+ */
+class RedisReceivedStamp implements StampInterface
+{
+ private $id;
+
+ public function __construct(string $id)
+ {
+ $this->id = $id;
+ }
+
+ public function getId(): string
+ {
+ return $this->id;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php
new file mode 100644
index 0000000000000..8ff60354b9415
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php
@@ -0,0 +1,90 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\RedisExt;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\LogicException;
+use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+
+/**
+ * @author Alexander Schranz
+ * @author Antoine Bluchet
+ *
+ * @experimental in 4.3
+ */
+class RedisReceiver implements ReceiverInterface
+{
+ private $connection;
+ private $serializer;
+
+ public function __construct(Connection $connection, SerializerInterface $serializer = null)
+ {
+ $this->connection = $connection;
+ $this->serializer = $serializer ?? new PhpSerializer();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function get(): iterable
+ {
+ $redisEnvelope = $this->connection->get();
+
+ if (null === $redisEnvelope) {
+ return [];
+ }
+
+ try {
+ $envelope = $this->serializer->decode([
+ 'body' => $redisEnvelope['body'],
+ 'headers' => $redisEnvelope['headers'],
+ ]);
+ } catch (MessageDecodingFailedException $exception) {
+ $this->connection->reject($redisEnvelope['id']);
+
+ throw $exception;
+ }
+
+ yield $envelope->with(new RedisReceivedStamp($redisEnvelope['id']));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function ack(Envelope $envelope): void
+ {
+ $this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reject(Envelope $envelope): void
+ {
+ $this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
+ }
+
+ private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
+ {
+ /** @var RedisReceivedStamp|null $redisReceivedStamp */
+ $redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);
+
+ if (null === $redisReceivedStamp) {
+ throw new LogicException('No RedisReceivedStamp found on the Envelope.');
+ }
+
+ return $redisReceivedStamp;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php
new file mode 100644
index 0000000000000..a6fba8404a3ac
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php
@@ -0,0 +1,46 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\RedisExt;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+
+/**
+ * @author Alexander Schranz
+ * @author Antoine Bluchet
+ *
+ * @experimental in 4.3
+ */
+class RedisSender implements SenderInterface
+{
+ private $connection;
+ private $serializer;
+
+ public function __construct(Connection $connection, SerializerInterface $serializer)
+ {
+ $this->connection = $connection;
+ $this->serializer = $serializer;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function send(Envelope $envelope): Envelope
+ {
+ $encodedMessage = $this->serializer->encode($envelope);
+
+ $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? []);
+
+ return $envelope;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php
new file mode 100644
index 0000000000000..3af4e94233675
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php
@@ -0,0 +1,88 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\RedisExt;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+/**
+ * @author Alexander Schranz
+ * @author Antoine Bluchet
+ *
+ * @experimental in 4.3
+ */
+class RedisTransport implements TransportInterface, SetupableTransportInterface
+{
+ private $serializer;
+ private $connection;
+ private $receiver;
+ private $sender;
+
+ public function __construct(Connection $connection, SerializerInterface $serializer = null)
+ {
+ $this->connection = $connection;
+ $this->serializer = $serializer ?? new PhpSerializer();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function get(): iterable
+ {
+ return ($this->receiver ?? $this->getReceiver())->get();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function ack(Envelope $envelope): void
+ {
+ ($this->receiver ?? $this->getReceiver())->ack($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reject(Envelope $envelope): void
+ {
+ ($this->receiver ?? $this->getReceiver())->reject($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function send(Envelope $envelope): Envelope
+ {
+ return ($this->sender ?? $this->getSender())->send($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setup(): void
+ {
+ $this->connection->setup();
+ }
+
+ private function getReceiver()
+ {
+ return $this->receiver = new RedisReceiver($this->connection, $this->serializer);
+ }
+
+ private function getSender()
+ {
+ return $this->sender = new RedisSender($this->connection, $this->serializer);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php
new file mode 100644
index 0000000000000..acb2d1f59160c
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php
@@ -0,0 +1,35 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\RedisExt;
+
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+/**
+ * @author Alexander Schranz
+ * @author Antoine Bluchet
+ *
+ * @experimental in 4.3
+ */
+class RedisTransportFactory implements TransportFactoryInterface
+{
+ public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
+ {
+ return new RedisTransport(Connection::fromDsn($dsn, $options), $serializer);
+ }
+
+ public function supports(string $dsn, array $options): bool
+ {
+ return 0 === strpos($dsn, 'redis://');
+ }
+}