diff --git a/.travis.yml b/.travis.yml index 9b87cfd5342f8..3cd322adfbde8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,7 @@ env: - MIN_PHP=7.1.3 - SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php - MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages + - MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages matrix: include: @@ -55,8 +56,8 @@ before_install: - | # Start Redis cluster - docker pull grokzen/redis-cluster:4.0.8 - docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8 + docker pull grokzen/redis-cluster:5.0.4 + docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4 export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005' - | @@ -116,6 +117,7 @@ before_install: local ext_name=$1 local ext_so=$2 local INI=$3 + local input=${4:-yes} local ext_dir=$(php -r "echo ini_get('extension_dir');") local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name @@ -124,7 +126,7 @@ before_install: else rm ~/.pearrc /tmp/pear 2>/dev/null || true mkdir -p $ext_cache - echo yes | pecl install -f $ext_name && + echo $input | pecl install -f $ext_name && cp $ext_dir/$ext_so $ext_cache fi } @@ -147,7 +149,6 @@ before_install: echo session.gc_probability = 0 >> $INI echo opcache.enable_cli = 1 >> $INI echo apc.enable_cli = 1 >> $INI - echo extension = redis.so >> $INI echo extension = memcached.so >> $INI done @@ -166,7 +167,11 @@ before_install: tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI + tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no" done + - | + # List all php extensions with versions + - php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;' - | # Load fixtures diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index b1c42387ff4fb..cfcbe1ee17db1 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1700,6 +1700,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder if (empty($config['transports'])) { $container->removeDefinition('messenger.transport.symfony_serializer'); $container->removeDefinition('messenger.transport.amqp.factory'); + $container->removeDefinition('messenger.transport.redis.factory'); } else { $container->getDefinition('messenger.transport.symfony_serializer') ->replaceArgument(1, $config['serializer']['symfony_serializer']['format']) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 4f677d40918a8..b0bcf2fd5ccbb 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -66,6 +66,10 @@ + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php index b655644e0dff5..68ff3607465b2 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php @@ -13,6 +13,7 @@ 'options' => ['queue' => ['name' => 'Queue']], 'serializer' => 'messenger.transport.native_php_serializer', ], + 'redis' => 'redis://127.0.0.1:6379/messages', ], ], ]); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml index 411c0c29e5b50..bb698cbc17105 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml @@ -17,6 +17,7 @@ + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml index 409e410986840..2fc1f482653e4 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml @@ -11,3 +11,4 @@ framework: queue: name: Queue serializer: 'messenger.transport.native_php_serializer' + redis: 'redis://127.0.0.1:6379/messages' diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index c69275e438d15..64044e7450177 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -673,6 +673,7 @@ public function testMessenger() $this->assertTrue($container->hasAlias('messenger.default_bus')); $this->assertTrue($container->getAlias('messenger.default_bus')->isPublic()); $this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory')); + $this->assertFalse($container->hasDefinition('messenger.transport.redis.factory')); $this->assertTrue($container->hasDefinition('messenger.transport_factory')); $this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass()); } @@ -697,6 +698,16 @@ public function testMessengerTransports() $this->assertEquals(new Reference('messenger.transport.native_php_serializer'), $transportArguments[2]); $this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory')); + + $this->assertTrue($container->hasDefinition('messenger.transport.redis')); + $transportFactory = $container->getDefinition('messenger.transport.redis')->getFactory(); + $transportArguments = $container->getDefinition('messenger.transport.redis')->getArguments(); + + $this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory); + $this->assertCount(3, $transportArguments); + $this->assertSame('redis://127.0.0.1:6379/messages', $transportArguments[0]); + + $this->assertTrue($container->hasDefinition('messenger.transport.redis.factory')); } public function testMessengerRouting() diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php new file mode 100644 index 0000000000000..96f2942050e97 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php @@ -0,0 +1,115 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; + +/** + * @requires extension redis + */ +class ConnectionTest extends TestCase +{ + public function testFromInvalidDsn() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.'); + + Connection::fromDsn('redis://'); + } + + public function testFromDsn() + { + $this->assertEquals( + new Connection(['stream' => 'queue'], [ + 'host' => 'localhost', + 'port' => 6379, + ]), + Connection::fromDsn('redis://localhost/queue') + ); + } + + public function testFromDsnWithOptions() + { + $this->assertEquals( + new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [ + 'host' => 'localhost', + 'port' => 6379, + ], [ + 'blocking_timeout' => 30, + ]), + Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30]) + ); + } + + public function testFromDsnWithQueryOptions() + { + $this->assertEquals( + new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [ + 'host' => 'localhost', + 'port' => 6379, + ], [ + 'blocking_timeout' => 30, + ]), + Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30') + ); + } + + public function testKeepGettingPendingMessages() + { + $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); + + $redis->expects($this->exactly(3))->method('xreadgroup') + ->with('symfony', 'consumer', ['queue' => 0], 1, null) + ->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]); + + $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); + $this->assertNotNull($connection->get()); + $this->assertNotNull($connection->get()); + $this->assertNotNull($connection->get()); + } + + public function testFirstGetPendingMessagesThenNewMessages() + { + $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); + + $count = 0; + + $redis->expects($this->exactly(2))->method('xreadgroup') + ->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) { + ++$count; + + if (1 === $count) { + return '0' === $arr_streams['queue']; + } + + return '>' === $arr_streams['queue']; + }), 1, null) + ->willReturn(['queue' => []]); + + $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); + $connection->get(); + } + + public function testUnexpectedRedisError() + { + $this->expectException(LogicException::class); + $this->expectExceptionMessage('Redis error happens'); + $redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock(); + $redis->expects($this->once())->method('xreadgroup')->willReturn(false); + $redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens'); + + $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); + $connection->get(); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php new file mode 100644 index 0000000000000..5342250e843f5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php @@ -0,0 +1,65 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; + +/** + * @requires extension redis + */ +class RedisExtIntegrationTest extends TestCase +{ + private $redis; + private $connection; + + protected function setUp() + { + if (!getenv('MESSENGER_REDIS_DSN')) { + $this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.'); + } + + $this->redis = new \Redis(); + $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis); + $this->clearRedis(); + $this->connection->setup(); + } + + public function testConnectionSendAndGet() + { + $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]); + $encoded = $this->connection->get(); + $this->assertEquals('{"message": "Hi"}', $encoded['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + } + + public function testGetTheFirstAvailableMessage() + { + $this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]); + $this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]); + $encoded = $this->connection->get(); + $this->assertEquals('{"message": "Hi1"}', $encoded['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + $encoded = $this->connection->get(); + $this->assertEquals('{"message": "Hi2"}', $encoded['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + } + + private function clearRedis() + { + $parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN')); + $pathParts = explode('/', $parsedUrl['path'] ?? ''); + $stream = $pathParts[1] ?? 'symfony'; + $this->redis->del($stream); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php new file mode 100644 index 0000000000000..c3bb532239756 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php @@ -0,0 +1,76 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +class RedisReceiverTest extends TestCase +{ + public function testItReturnsTheDecodedMessageToTheHandler() + { + $serializer = $this->createSerializer(); + + $redisEnvelop = $this->createRedisEnvelope(); + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($redisEnvelop); + + $receiver = new RedisReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + } + + public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() + { + $this->expectException(MessageDecodingFailedException::class); + + $serializer = $this->createMock(PhpSerializer::class); + $serializer->method('decode')->willThrowException(new MessageDecodingFailedException()); + + $redisEnvelop = $this->createRedisEnvelope(); + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($redisEnvelop); + $connection->expects($this->once())->method('reject'); + + $receiver = new RedisReceiver($connection, $serializer); + iterator_to_array($receiver->get()); + } + + private function createRedisEnvelope() + { + return [ + 'id' => 1, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + ], + ]; + } + + private function createSerializer(): Serializer + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + return $serializer; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php new file mode 100644 index 0000000000000..5cbda34e10b97 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisSender; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +class RedisSenderTest extends TestCase +{ + public function testSend() + { + $envelope = new Envelope(new DummyMessage('Oy')); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->getMock(); + $connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers']); + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $sender = new RedisSender($connection, $serializer); + $sender->send($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php new file mode 100644 index 0000000000000..58b71536cf9d6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php @@ -0,0 +1,42 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport; +use Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @requires extension redis + */ +class RedisTransportFactoryTest extends TestCase +{ + public function testSupportsOnlyRedisTransports() + { + $factory = new RedisTransportFactory(); + + $this->assertTrue($factory->supports('redis://localhost', [])); + $this->assertFalse($factory->supports('sqs://localhost', [])); + $this->assertFalse($factory->supports('invalid-dsn', [])); + } + + public function testCreateTransport() + { + $factory = new RedisTransportFactory(); + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', ['foo' => 'bar']), $serializer); + + $this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', ['foo' => 'bar'], $serializer)); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php new file mode 100644 index 0000000000000..0c83e6be88c46 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php @@ -0,0 +1,60 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\RedisExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\RedisExt\Connection; +use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +class RedisTransportTest extends TestCase +{ + public function testItIsATransport() + { + $transport = $this->getTransport(); + + $this->assertInstanceOf(TransportInterface::class, $transport); + } + + public function testReceivesMessages() + { + $transport = $this->getTransport( + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(), + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock() + ); + + $decodedMessage = new DummyMessage('Decoded.'); + + $redisEnvelope = [ + 'id' => '5', + 'body' => 'body', + 'headers' => ['my' => 'header'], + ]; + + $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); + $connection->method('get')->willReturn($redisEnvelope); + + $envelopes = iterator_to_array($transport->get()); + $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); + } + + private function getTransport(SerializerInterface $serializer = null, Connection $connection = null) + { + $serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock(); + $connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + + return new RedisTransport($connection, $serializer); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php new file mode 100644 index 0000000000000..056159818ec01 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php @@ -0,0 +1,135 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Exception\LogicException; + +/** + * A Redis connection. + * + * @author Alexander Schranz + * @author Antoine Bluchet + * + * @internal + * @final + * + * @experimental in 4.3 + */ +class Connection +{ + private $connection; + private $stream; + private $group; + private $consumer; + private $blockingTimeout; + private $couldHavePendingMessages = true; + + public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null) + { + $this->connection = $redis ?: new \Redis(); + $this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379); + $this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP); + $this->stream = $configuration['stream'] ?? '' ?: 'messages'; + $this->group = $configuration['group'] ?? '' ?: 'symfony'; + $this->consumer = $configuration['consumer'] ?? '' ?: 'consumer'; + $this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null; + } + + public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self + { + if (false === $parsedUrl = parse_url($dsn)) { + throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn)); + } + + $pathParts = explode('/', $parsedUrl['path'] ?? ''); + + $stream = $pathParts[1] ?? ''; + $group = $pathParts[2] ?? ''; + $consumer = $pathParts[3] ?? ''; + + $connectionCredentials = [ + 'host' => $parsedUrl['host'] ?? '127.0.0.1', + 'port' => $parsedUrl['port'] ?? 6379, + ]; + + if (isset($parsedUrl['query'])) { + parse_str($parsedUrl['query'], $redisOptions); + } + + return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions, $redis); + } + + public function get(): ?array + { + $messageId = '>'; // will receive new messages + + if ($this->couldHavePendingMessages) { + $messageId = '0'; // will receive consumers pending messages + } + + $messages = $this->connection->xreadgroup( + $this->group, + $this->consumer, + [$this->stream => $messageId], + 1, + $this->blockingTimeout + ); + + if (false === $messages) { + throw new LogicException( + $this->connection->getLastError() ?: 'Unexpected redis stream error happened.' + ); + } + + if ($this->couldHavePendingMessages && empty($messages[$this->stream])) { + $this->couldHavePendingMessages = false; + + // No pending messages so get a new one + return $this->get(); + } + + foreach ($messages[$this->stream] as $key => $message) { + $redisEnvelope = \json_decode($message['message'], true); + + return [ + 'id' => $key, + 'body' => $redisEnvelope['body'], + 'headers' => $redisEnvelope['headers'], + ]; + } + + return null; + } + + public function ack(string $id): void + { + $this->connection->xack($this->stream, $this->group, [$id]); + } + + public function reject(string $id): void + { + $this->connection->xdel($this->stream, [$id]); + } + + public function add(string $body, array $headers) + { + $this->connection->xadd($this->stream, '*', ['message' => json_encode( + ['body' => $body, 'headers' => $headers] + )]); + } + + public function setup(): void + { + $this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceivedStamp.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceivedStamp.php new file mode 100644 index 0000000000000..c0b6ad37bded9 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceivedStamp.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Stamp\StampInterface; + +/** + * @author Alexander Schranz + * + * @experimental in 4.3 + */ +class RedisReceivedStamp implements StampInterface +{ + private $id; + + public function __construct(string $id) + { + $this->id = $id; + } + + public function getId(): string + { + return $this->id; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php new file mode 100644 index 0000000000000..8ff60354b9415 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php @@ -0,0 +1,90 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @author Alexander Schranz + * @author Antoine Bluchet + * + * @experimental in 4.3 + */ +class RedisReceiver implements ReceiverInterface +{ + private $connection; + private $serializer; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + $redisEnvelope = $this->connection->get(); + + if (null === $redisEnvelope) { + return []; + } + + try { + $envelope = $this->serializer->decode([ + 'body' => $redisEnvelope['body'], + 'headers' => $redisEnvelope['headers'], + ]); + } catch (MessageDecodingFailedException $exception) { + $this->connection->reject($redisEnvelope['id']); + + throw $exception; + } + + yield $envelope->with(new RedisReceivedStamp($redisEnvelope['id'])); + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + $this->connection->ack($this->findRedisReceivedStamp($envelope)->getId()); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + $this->connection->reject($this->findRedisReceivedStamp($envelope)->getId()); + } + + private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp + { + /** @var RedisReceivedStamp|null $redisReceivedStamp */ + $redisReceivedStamp = $envelope->last(RedisReceivedStamp::class); + + if (null === $redisReceivedStamp) { + throw new LogicException('No RedisReceivedStamp found on the Envelope.'); + } + + return $redisReceivedStamp; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php new file mode 100644 index 0000000000000..a6fba8404a3ac --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php @@ -0,0 +1,46 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +/** + * @author Alexander Schranz + * @author Antoine Bluchet + * + * @experimental in 4.3 + */ +class RedisSender implements SenderInterface +{ + private $connection; + private $serializer; + + public function __construct(Connection $connection, SerializerInterface $serializer) + { + $this->connection = $connection; + $this->serializer = $serializer; + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + $encodedMessage = $this->serializer->encode($envelope); + + $this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? []); + + return $envelope; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php new file mode 100644 index 0000000000000..3af4e94233675 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php @@ -0,0 +1,88 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\SetupableTransportInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Alexander Schranz + * @author Antoine Bluchet + * + * @experimental in 4.3 + */ +class RedisTransport implements TransportInterface, SetupableTransportInterface +{ + private $serializer; + private $connection; + private $receiver; + private $sender; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + return ($this->receiver ?? $this->getReceiver())->get(); + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->ack($envelope); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->reject($envelope); + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + return ($this->sender ?? $this->getSender())->send($envelope); + } + + /** + * {@inheritdoc} + */ + public function setup(): void + { + $this->connection->setup(); + } + + private function getReceiver() + { + return $this->receiver = new RedisReceiver($this->connection, $this->serializer); + } + + private function getSender() + { + return $this->sender = new RedisSender($this->connection, $this->serializer); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php new file mode 100644 index 0000000000000..acb2d1f59160c --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransportFactory.php @@ -0,0 +1,35 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\RedisExt; + +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportFactoryInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Alexander Schranz + * @author Antoine Bluchet + * + * @experimental in 4.3 + */ +class RedisTransportFactory implements TransportFactoryInterface +{ + public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface + { + return new RedisTransport(Connection::fromDsn($dsn, $options), $serializer); + } + + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'redis://'); + } +}