Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 87d031c

Browse filesBrowse files
feature #40155 [Messenger] Support Redis Cluster (nesk)
This PR was squashed before being merged into the 5.3-dev branch. Discussion ---------- [Messenger] Support Redis Cluster | Q | A | ------------- | --- | Branch? | 5.x | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | Fix #38264 | License | MIT | Doc PR | symfony/symfony-docs#14956 This PR brings support for Redis Cluster in the Messenger component: - The first commit _Support RedisCluster instance_ allows to pass a `RedisCluster` object when instanciating the `Connection` class, which brings support for Redis Cluster without any friction. - The second commit _Support multiple hosts DSN for Redis Cluster_ is more opiniated and brings a DSN format to configure a Redis Cluster from `config/packages/messenger.yaml`. Instanciating `Connection` with a `RedisCluster` object: ```php $redis = new \RedisCluster(null, ['host-01:6379', 'host-02:6379', 'host-03:6379', 'host-04:6379']); $connection = new Connection([], [], [], $redis); ``` Configuring a Redis Cluster from YAML: ```yaml // config/packages/messenger.yaml framework: messenger: metadata: default: 'redis://host-01:6379,redis://host-02:6379,redis://host-03:6379' lazy: 'redis://host-01:6379?lazy=1,redis://host-02:6379,redis://host-03:6379' # Configuration will be `lazy = true` and `auto_setup = true` multipleConfig: 'redis://host-01:6379?lazy=1&auto_setup=false,redis://host-02:6379,redis://host-03:6379?auto_setup=true' ``` This format allows to define multiple hosts for a Redis Cluster and still contains valid URLs. Custom configuration is still supported, it can be specified on only one of the URLs in the DSN (see `lazy` above). If the user provides multiple configurations on different URLs, they are simply merged with the following code and if an option is defined multiple times then the latest takes precedence (see `multipleConfig` above). I understand the way the DSN is handled could not suit you. Please, if you close this PR only for the DSN part, just tell me and I will make a new PR with only the first commit. Commits ------- 04530fb [Messenger] Support Redis Cluster
2 parents 44bb691 + 04530fb commit 87d031c
Copy full SHA for 87d031c

File tree

5 files changed

+178
-35
lines changed
Filter options

5 files changed

+178
-35
lines changed

‎src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ CHANGELOG
66

77
* Add `rediss://` DSN scheme support for TLS protocol
88
* Deprecate TLS option, use `rediss://127.0.0.1` instead of `redis://127.0.0.1?tls=1`
9+
* Add support for `\RedisCluster` instance in `Connection` constructor
10+
* Add support for Redis Cluster in DSN
911

1012
5.2.0
1113
-----

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php
+47Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ public static function setUpBeforeClass(): void
4040
}
4141
}
4242

43+
private function skipIfRedisClusterUnavailable()
44+
{
45+
try {
46+
new \RedisCluster(null, explode(' ', getenv('REDIS_CLUSTER_HOSTS')));
47+
} catch (\Exception $e) {
48+
self::markTestSkipped($e->getMessage());
49+
}
50+
}
51+
4352
public function testFromInvalidDsn()
4453
{
4554
$this->expectException(\InvalidArgumentException::class);
@@ -59,6 +68,20 @@ public function testFromDsn()
5968
);
6069
}
6170

71+
public function testFromDsnWithMultipleHosts()
72+
{
73+
$this->skipIfRedisClusterUnavailable();
74+
75+
$hosts = explode(' ', getenv('REDIS_CLUSTER_HOSTS'));
76+
77+
$dsn = array_map(function ($host) {
78+
return 'redis://'.$host;
79+
}, $hosts);
80+
$dsn = implode(',', $dsn);
81+
82+
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn));
83+
}
84+
6285
public function testFromDsnOnUnixSocket()
6386
{
6487
$this->assertEquals(
@@ -160,6 +183,14 @@ public function testDeprecationIfInvalidOptionIsPassedWithDsn()
160183
Connection::fromDsn('redis://localhost/queue?foo=bar');
161184
}
162185

186+
public function testRedisClusterInstanceIsSupported()
187+
{
188+
$this->skipIfRedisClusterUnavailable();
189+
190+
$redis = new \RedisCluster(null, explode(' ', getenv('REDIS_CLUSTER_HOSTS')));
191+
$this->assertInstanceOf(Connection::class, new Connection([], [], [], $redis));
192+
}
193+
163194
public function testKeepGettingPendingMessages()
164195
{
165196
$redis = $this->createMock(\Redis::class);
@@ -429,4 +460,20 @@ public function testLazy()
429460
$connection->reject($message['id']);
430461
$redis->del('messenger-lazy');
431462
}
463+
464+
public function testLazyCluster()
465+
{
466+
$this->skipIfRedisClusterUnavailable();
467+
468+
$connection = new Connection(
469+
['lazy' => true],
470+
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))]
471+
);
472+
473+
$connection->add('1', []);
474+
$this->assertNotEmpty($message = $connection->get());
475+
$this->assertSame('1', $message['body']);
476+
$connection->reject($message['id']);
477+
$connection->cleanup();
478+
}
432479
}

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php
+83-34Lines changed: 83 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ class Connection
5656
private $deleteAfterReject;
5757
private $couldHavePendingMessages = true;
5858

59-
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
59+
/**
60+
* @param \Redis|\RedisCluster|null $redis
61+
*/
62+
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], $redis = null)
6063
{
6164
if (version_compare(phpversion('redis'), '4.3.0', '<')) {
6265
throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
@@ -71,29 +74,19 @@ public function __construct(array $configuration, array $connectionCredentials =
7174
$auth = null;
7275
}
7376

74-
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
75-
$redis->connect($host, $port);
76-
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
77-
78-
if (null !== $auth && !$redis->auth($auth)) {
79-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
80-
}
81-
82-
if ($dbIndex && !$redis->select($dbIndex)) {
83-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
84-
}
85-
86-
return true;
87-
};
88-
89-
if (null === $redis) {
90-
$redis = new \Redis();
91-
}
92-
93-
if ($configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy']) {
94-
$redis = new RedisProxy($redis, $initializer);
77+
$lazy = $configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy'];
78+
if (\is_array($host) || $redis instanceof \RedisCluster) {
79+
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
80+
$initializer = static function ($redis) use ($hosts, $auth, $serializer) {
81+
return self::initializeRedisCluster($redis, $hosts, $auth, $serializer);
82+
};
83+
$redis = $lazy ? new RedisClusterProxy($redis, $initializer) : $initializer($redis);
9584
} else {
96-
$initializer($redis);
85+
$redis = $redis ?? new \Redis();
86+
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
87+
return self::initializeRedis($redis, $host, $port, $auth, $serializer, $dbIndex);
88+
};
89+
$redis = $lazy ? new RedisProxy($redis, $initializer) : $initializer($redis);
9790
}
9891

9992
$this->connection = $redis;
@@ -116,21 +109,57 @@ public function __construct(array $configuration, array $connectionCredentials =
116109
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
117110
}
118111

119-
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
112+
private static function initializeRedis(\Redis $redis, string $host, int $port, ?string $auth, int $serializer, int $dbIndex): \Redis
120113
{
121-
$url = $dsn;
122-
$scheme = 0 === strpos($dsn, 'rediss:') ? 'rediss' : 'redis';
114+
$redis->connect($host, $port);
115+
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
123116

124-
if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
125-
$url = str_replace($scheme.':', 'file:', $dsn);
117+
if (null !== $auth && !$redis->auth($auth)) {
118+
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
126119
}
127120

128-
if (false === $parsedUrl = parse_url($url)) {
129-
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
121+
if ($dbIndex && !$redis->select($dbIndex)) {
122+
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
130123
}
131-
if (isset($parsedUrl['query'])) {
132-
parse_str($parsedUrl['query'], $dsnOptions);
133-
$redisOptions = array_merge($redisOptions, $dsnOptions);
124+
125+
return $redis;
126+
}
127+
128+
private static function initializeRedisCluster(?\RedisCluster $redis, array $hosts, ?string $auth, int $serializer): \RedisCluster
129+
{
130+
if (null === $redis) {
131+
$redis = new \RedisCluster(null, $hosts, 0.0, 0.0, false, $auth);
132+
}
133+
134+
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
135+
136+
return $redis;
137+
}
138+
139+
/**
140+
* @param \Redis|\RedisCluster|null $redis
141+
*/
142+
public static function fromDsn(string $dsn, array $redisOptions = [], $redis = null): self
143+
{
144+
if (false === strpos($dsn, ',')) {
145+
$parsedUrl = self::parseDsn($dsn, $redisOptions);
146+
} else {
147+
$dsns = explode(',', $dsn);
148+
$parsedUrls = array_map(function ($dsn) use (&$redisOptions) {
149+
return self::parseDsn($dsn, $redisOptions);
150+
}, $dsns);
151+
152+
// Merge all the URLs, the last one overrides the previous ones
153+
$parsedUrl = array_merge(...$parsedUrls);
154+
155+
// Regroup all the hosts in an array interpretable by RedisCluster
156+
$parsedUrl['host'] = array_map(function ($parsedUrl, $dsn) {
157+
if (!isset($parsedUrl['host'])) {
158+
throw new InvalidArgumentException(sprintf('Missing host in DSN part "%s", it must be defined when using Redis Cluster.', $dsn));
159+
}
160+
161+
return $parsedUrl['host'].':'.($parsedUrl['port'] ?? 6379);
162+
}, $parsedUrls, $dsns);
134163
}
135164

136165
self::validateOptions($redisOptions);
@@ -165,7 +194,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
165194
unset($redisOptions['dbindex']);
166195
}
167196

168-
$tls = 'rediss' === $scheme;
197+
$tls = 'rediss' === $parsedUrl['scheme'];
169198
if (\array_key_exists('tls', $redisOptions)) {
170199
trigger_deprecation('symfony/redis-messenger', '5.3', 'Providing "tls" parameter is deprecated, use "rediss://" DSN scheme instead');
171200
$tls = filter_var($redisOptions['tls'], \FILTER_VALIDATE_BOOLEAN);
@@ -223,6 +252,26 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
223252
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
224253
}
225254

255+
private static function parseDsn(string $dsn, array &$redisOptions): array
256+
{
257+
$url = $dsn;
258+
$scheme = 0 === strpos($dsn, 'rediss:') ? 'rediss' : 'redis';
259+
260+
if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
261+
$url = str_replace($scheme.':', 'file:', $dsn);
262+
}
263+
264+
if (false === $parsedUrl = parse_url($url)) {
265+
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
266+
}
267+
if (isset($parsedUrl['query'])) {
268+
parse_str($parsedUrl['query'], $dsnOptions);
269+
$redisOptions = array_merge($redisOptions, $dsnOptions);
270+
}
271+
272+
return $parsedUrl;
273+
}
274+
226275
private static function validateOptions(array $options): void
227276
{
228277
$availableOptions = array_keys(self::DEFAULT_OPTIONS);
+42Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
13+
14+
/**
15+
* Allow to delay connection to Redis Cluster.
16+
*
17+
* @author Johann Pardanaud <johann@pardanaud.com>
18+
*
19+
* @internal
20+
*/
21+
class RedisClusterProxy
22+
{
23+
private $redis;
24+
private $initializer;
25+
private $ready = false;
26+
27+
public function __construct(?\RedisCluster $redis, \Closure $initializer)
28+
{
29+
$this->redis = $redis;
30+
$this->initializer = $initializer;
31+
}
32+
33+
public function __call(string $method, array $args)
34+
{
35+
if (!$this->ready) {
36+
$this->redis = $this->initializer->__invoke($this->redis);
37+
$this->ready = true;
38+
}
39+
40+
return $this->redis->{$method}(...$args);
41+
}
42+
}

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisProxy.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisProxy.php
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public function __construct(\Redis $redis, \Closure $initializer)
3333

3434
public function __call(string $method, array $args)
3535
{
36-
$this->ready ?: $this->ready = $this->initializer->__invoke($this->redis);
36+
if (!$this->ready) {
37+
$this->redis = $this->initializer->__invoke($this->redis);
38+
$this->ready = true;
39+
}
3740

3841
return $this->redis->{$method}(...$args);
3942
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.