Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 078b5c4

Browse filesBrowse files
committed
[Messenger] Support Redis Cluster in DSN
1 parent 839528b commit 078b5c4
Copy full SHA for 078b5c4

File tree

4 files changed

+77
-19
lines changed
Filter options

4 files changed

+77
-19
lines changed

‎src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
-----
66

77
* Added support for `\RedisCluster` instance in `Connection` constructor
8+
* Added support for Redis Cluster in DSN
89

910
5.2.0
1011
-----

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php
+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ public function testFromDsn()
6868
);
6969
}
7070

71+
public function testFromDsnWithMultipleHosts()
72+
{
73+
$this->skipIfRedisClusterUnavailable();
74+
75+
$hosts = explode(' ', getenv('REDIS_CLUSTER_HOSTS'));
76+
77+
$dsn = array_map(function ($host) {
78+
return 'redis://'.$host;
79+
}, $hosts);
80+
$dsn = implode(',', $dsn);
81+
82+
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn));
83+
}
84+
7185
public function testFromDsnOnUnixSocket()
7286
{
7387
$this->assertEquals(
@@ -429,4 +443,20 @@ public function testLazy()
429443
$connection->reject($message['id']);
430444
$redis->del('messenger-lazy');
431445
}
446+
447+
public function testLazyCluster()
448+
{
449+
$this->skipIfRedisClusterUnavailable();
450+
451+
$connection = new Connection(
452+
['lazy' => true],
453+
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))]
454+
);
455+
456+
$connection->add('1', []);
457+
$this->assertNotEmpty($message = $connection->get());
458+
$this->assertSame('1', $message['body']);
459+
$connection->reject($message['id']);
460+
$connection->cleanup();
461+
}
432462
}

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php
+41-17Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ public function __construct(array $configuration, array $connectionCredentials =
7575
}
7676

7777
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
78+
if (null === $redis) {
79+
$redis = !\is_array($host) ? new \Redis() : new \RedisCluster(null, $host);
80+
}
81+
7882
if ($redis instanceof \Redis) {
7983
$redis->connect($host, $port);
8084
}
@@ -89,17 +93,13 @@ public function __construct(array $configuration, array $connectionCredentials =
8993
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
9094
}
9195

92-
return true;
96+
return $redis;
9397
};
9498

95-
if (null === $redis) {
96-
$redis = new \Redis();
97-
}
98-
9999
if ($configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy']) {
100100
$redis = new RedisProxy($redis, $initializer);
101101
} else {
102-
$initializer($redis);
102+
$redis = $initializer($redis);
103103
}
104104

105105
$this->connection = $redis;
@@ -122,20 +122,25 @@ public function __construct(array $configuration, array $connectionCredentials =
122122
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
123123
}
124124

125-
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
125+
/**
126+
* @param \Redis|\RedisCluster|null $redis
127+
*/
128+
public static function fromDsn(string $dsn, array $redisOptions = [], $redis = null): self
126129
{
127-
$url = $dsn;
130+
if (false === strpos($dsn, ',')) {
131+
$parsedUrl = self::parseDsn($dsn, $redisOptions);
132+
} else {
133+
$parsedUrls = array_map(function ($dsn) use (&$redisOptions) {
134+
return self::parseDsn($dsn, $redisOptions);
135+
}, explode(',', $dsn));
128136

129-
if (preg_match('#^redis:///([^:@])+$#', $dsn)) {
130-
$url = str_replace('redis:', 'file:', $dsn);
131-
}
137+
// Merge all the URLs, the last one overrides the previous ones
138+
$parsedUrl = array_merge(...$parsedUrls);
132139

133-
if (false === $parsedUrl = parse_url($url)) {
134-
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
135-
}
136-
if (isset($parsedUrl['query'])) {
137-
parse_str($parsedUrl['query'], $dsnOptions);
138-
$redisOptions = array_merge($redisOptions, $dsnOptions);
140+
// Regroup all the hosts in an array interpretable by RedisCluster
141+
$parsedUrl['host'] = array_map(function ($parsedUrl) {
142+
return $parsedUrl['host'].':'.($parsedUrl['port'] ?? 6379);
143+
}, $parsedUrls);
139144
}
140145

141146
self::validateOptions($redisOptions);
@@ -227,6 +232,25 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
227232
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
228233
}
229234

235+
private static function parseDsn(string $dsn, array &$redisOptions): array
236+
{
237+
$url = $dsn;
238+
239+
if (preg_match('#^redis:///([^:@])+$#', $dsn)) {
240+
$url = str_replace('redis:', 'file:', $dsn);
241+
}
242+
243+
if (false === $parsedUrl = parse_url($url)) {
244+
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
245+
}
246+
if (isset($parsedUrl['query'])) {
247+
parse_str($parsedUrl['query'], $dsnOptions);
248+
$redisOptions = array_merge($redisOptions, $dsnOptions);
249+
}
250+
251+
return $parsedUrl;
252+
}
253+
230254
private static function validateOptions(array $options): void
231255
{
232256
$availableOptions = array_keys(self::DEFAULT_OPTIONS);

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisProxy.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisProxy.php
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RedisProxy
2626
private $ready = false;
2727

2828
/**
29-
* @param \Redis|\RedisCluster $redis
29+
* @param \Redis|\RedisCluster|null $redis
3030
*/
3131
public function __construct($redis, \Closure $initializer)
3232
{
@@ -36,7 +36,10 @@ public function __construct($redis, \Closure $initializer)
3636

3737
public function __call(string $method, array $args)
3838
{
39-
$this->ready ?: $this->ready = $this->initializer->__invoke($this->redis);
39+
if (!$this->ready) {
40+
$this->redis = $this->initializer->__invoke($this->redis);
41+
$this->ready = true;
42+
}
4043

4144
return $this->redis->{$method}(...$args);
4245
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.