Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3380518

Browse filesBrowse files
committed
[Messenger] Add support for multiple Redis Sentinel hosts
1 parent 5bfce3f commit 3380518
Copy full SHA for 3380518

File tree

Expand file treeCollapse file tree

5 files changed

+91
-16
lines changed
Filter options
Expand file treeCollapse file tree

5 files changed

+91
-16
lines changed

‎.github/workflows/integration-tests.yml

Copy file name to clipboardExpand all lines: .github/workflows/integration-tests.yml
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ jobs:
172172
env:
173173
REDIS_HOST: 'localhost:16379'
174174
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
175-
REDIS_SENTINEL_HOSTS: 'localhost:26379 localhost:26379 localhost:26379'
175+
REDIS_SENTINEL_HOSTS: 'unreachable-host:26379 localhost:26379 localhost:26379'
176176
REDIS_SENTINEL_SERVICE: redis_sentinel
177177
MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages
178178
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages

‎src/Symfony/Component/Cache/Traits/RedisTrait.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Cache/Traits/RedisTrait.php
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,18 +225,18 @@ public static function createConnection(#[\SensitiveParameter] string $dsn, arra
225225
if (\defined('Redis::OPT_NULL_MULTIBULK_AS_NULL') && isset($params['auth'])) {
226226
$extra = [$params['auth']];
227227
}
228-
$sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra);
229228

230229
try {
230+
$sentinel = new $sentinelClass($host, $port, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval'], $params['read_timeout'], ...$extra);
231231
if ($address = $sentinel->getMasterAddrByName($params['redis_sentinel'])) {
232232
[$host, $port] = $address;
233233
}
234-
} catch (\RedisException $e) {
234+
} catch (\RedisException|\Relay\Exception $redisException) {
235235
}
236236
} while (++$hostIndex < \count($hosts) && !$address);
237237

238238
if (isset($params['redis_sentinel']) && !$address) {
239-
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']));
239+
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $params['redis_sentinel']), previous: $redisException ?? null);
240240
}
241241

242242
try {

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php
+29Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,35 @@ public function testConnectionClaimAndRedeliver()
220220
$connection->ack($message['id']);
221221
}
222222

223+
public function testSentinel()
224+
{
225+
if (!$hosts = getenv('REDIS_SENTINEL_HOSTS')) {
226+
$this->markTestSkipped('REDIS_SENTINEL_HOSTS env var is not defined.');
227+
}
228+
229+
if (!getenv('MESSENGER_REDIS_SENTINEL_MASTER')) {
230+
$this->markTestSkipped('MESSENGER_REDIS_SENTINEL_MASTER env var is not defined.');
231+
}
232+
233+
$dsn = 'redis:?host['.str_replace(' ', ']&host[', $hosts).']';
234+
235+
$connection = Connection::fromDsn($dsn,
236+
['delete_after_ack' => true,
237+
'sentinel_master' => getenv('MESSENGER_REDIS_SENTINEL_MASTER') ?: null,
238+
], $this->redis);
239+
240+
$connection->add('1', []);
241+
$this->assertNotEmpty($message = $connection->get());
242+
$this->assertSame([
243+
'message' => json_encode([
244+
'body' => '1',
245+
'headers' => [],
246+
]),
247+
], $message['data']);
248+
$connection->reject($message['id']);
249+
$connection->cleanup();
250+
}
251+
223252
public function testLazySentinel()
224253
{
225254
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'),

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php
+57-12Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,26 +84,45 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster $redis =
8484
throw new InvalidArgumentException('Redis Sentinel support requires ext-redis>=5.2, or ext-relay.');
8585
}
8686

87-
if (null !== $sentinelMaster && ($redis instanceof \RedisCluster || \is_array($host))) {
87+
if (null !== $sentinelMaster && $redis instanceof \RedisCluster) {
8888
throw new InvalidArgumentException('Cannot configure Redis Sentinel and Redis Cluster instance at the same time.');
8989
}
9090

91-
if (\is_array($host) || $redis instanceof \RedisCluster) {
91+
if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
9292
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
9393
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
9494
} else {
95-
if (null !== $sentinelMaster) {
96-
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
97-
$sentinelClient = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
98-
99-
if (!$address = $sentinelClient->getMasterAddrByName($sentinelMaster)) {
100-
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from master name "%s" and address "%s:%d".', $sentinelMaster, $host, $port));
95+
$this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
96+
if (null !== $sentinelMaster) {
97+
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
98+
$hostIndex = 0;
99+
$hosts = \is_array($host) ? $host : [['scheme' => 'tcp', 'host' => $host, 'port' => $port]];
100+
do {
101+
$host = $hosts[$hostIndex]['host'];
102+
$port = $hosts[$hostIndex]['port'] ?? 0;
103+
$tls = 'tls' === $hosts[$hostIndex]['scheme'];
104+
$address = false;
105+
106+
if (isset($hosts[$hostIndex]['host']) && $tls) {
107+
$host = 'tls://'.$host;
108+
}
109+
110+
try {
111+
$sentinel = new $sentinelClass($host, $port, $options['timeout'], $options['persistent_id'], $options['retry_interval'], $options['read_timeout']);
112+
if ($address = $sentinel->getMasterAddrByName($sentinelMaster)) {
113+
[$host, $port] = $address;
114+
}
115+
} catch (\RedisException|\Relay\Exception $redisException) {
116+
}
117+
} while (++$hostIndex < \count($hosts) && !$address);
118+
119+
if (!$address) {
120+
throw new InvalidArgumentException(sprintf('Failed to retrieve master information from sentinel "%s".', $sentinelMaster), previous: $redisException ?? null);
121+
}
101122
}
102123

103-
[$host, $port] = $address;
104-
}
105-
106-
$this->redis = static fn () => self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
124+
return self::initializeRedis($redis ?? (\extension_loaded('redis') ? new \Redis() : new Relay()), $host, $port, $auth, $options);
125+
};
107126
}
108127

109128
if (!$options['lazy']) {
@@ -207,6 +226,32 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
207226
$user = '' !== ($parsedUrl['user'] ?? '') ? urldecode($parsedUrl['user']) : null;
208227
$options['auth'] ??= null !== $pass && null !== $user ? [$user, $pass] : ($pass ?? $user);
209228

229+
if (isset($parsedUrl['query'])) {
230+
parse_str($parsedUrl['query'], $query);
231+
232+
if (isset($query['host'])) {
233+
$tls = 'rediss' === $parsedUrl['scheme'];
234+
$tcpScheme = $tls ? 'tls' : 'tcp';
235+
236+
if (!\is_array($hosts = $query['host'])) {
237+
throw new InvalidArgumentException(sprintf('Invalid Redis DSN: "%s".', $dsn));
238+
}
239+
foreach ($hosts as $host => $parameters) {
240+
if (\is_string($parameters)) {
241+
parse_str($parameters, $parameters);
242+
}
243+
if (false === $i = strrpos($host, ':')) {
244+
$hosts[$host] = ['scheme' => $tcpScheme, 'host' => $host, 'port' => 6379] + $parameters;
245+
} elseif ($port = (int) substr($host, 1 + $i)) {
246+
$hosts[$host] = ['scheme' => $tcpScheme, 'host' => substr($host, 0, $i), 'port' => $port] + $parameters;
247+
} else {
248+
$hosts[$host] = ['scheme' => 'unix', 'host' => substr($host, 0, $i)] + $parameters;
249+
}
250+
}
251+
$parsedUrl['host'] = array_values($hosts);
252+
}
253+
}
254+
210255
if (isset($parsedUrl['host'])) {
211256
$options['host'] = $parsedUrl['host'] ?? $options['host'];
212257
$options['port'] = $parsedUrl['port'] ?? $options['port'];

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CHANGELOG
66

77
* Deprecate `StopWorkerOnSignalsListener` in favor of using the `SignalableCommandInterface`
88
* Add `HandlerDescriptor::getOptions`
9+
* Add support for multiple Redis Sentinel hosts
910

1011
6.3
1112
---

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.