Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 1bb14c3

Browse filesBrowse files
[Messenger] fix Redis support on 32b arch
1 parent 6d688f6 commit 1bb14c3
Copy full SHA for 1bb14c3

File tree

5 files changed

+78
-36
lines changed
Filter options

5 files changed

+78
-36
lines changed

‎.appveyor.yml

Copy file name to clipboardExpand all lines: .appveyor.yml
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ install:
2121
- cd ext
2222
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_apcu-5.1.18-7.1-ts-vc14-x86.zip
2323
- 7z x php_apcu-5.1.18-7.1-ts-vc14-x86.zip -y >nul
24+
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_redis-5.1.1-7.1-ts-vc14-x86.zip
25+
- 7z x php_redis-5.1.1-7.1-ts-vc14-x86.zip -y >nul
2426
- cd ..
2527
- copy /Y php.ini-development php.ini-min
2628
- echo memory_limit=-1 >> php.ini-min
@@ -36,6 +38,7 @@ install:
3638
- echo opcache.enable_cli=1 >> php.ini-max
3739
- echo extension=php_openssl.dll >> php.ini-max
3840
- echo extension=php_apcu.dll >> php.ini-max
41+
- echo extension=php_redis.dll >> php.ini-max
3942
- echo apc.enable_cli=1 >> php.ini-max
4043
- echo extension=php_intl.dll >> php.ini-max
4144
- echo extension=php_mbstring.dll >> php.ini-max
@@ -54,6 +57,7 @@ install:
5457
- SET COMPOSER_ROOT_VERSION=%SYMFONY_VERSION%.x-dev
5558
- php composer.phar update --no-progress --ansi
5659
- php phpunit install
60+
- choco install memurai-developer
5761

5862
test_script:
5963
- SET X=0

‎.github/workflows/integration-tests.yml

Copy file name to clipboardExpand all lines: .github/workflows/integration-tests.yml
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,11 @@ jobs:
9999
- name: Run tests
100100
run: ./phpunit --group integration -v
101101
env:
102-
REDIS_HOST: localhost
103102
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
104103
REDIS_SENTINEL_HOSTS: 'localhost:26379'
105104
REDIS_SENTINEL_SERVICE: redis_sentinel
106105
MESSENGER_REDIS_DSN: redis://127.0.0.1:7006/messages
107106
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages
108-
MEMCACHED_HOST: localhost
109-
LDAP_HOST: localhost
110-
LDAP_PORT: 3389
111107

112108
#- name: Run HTTP push tests
113109
# if: matrix.php == '8.0'

‎phpunit.xml.dist

Copy file name to clipboardExpand all lines: phpunit.xml.dist
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
<env name="LDAP_HOST" value="localhost" />
1919
<env name="LDAP_PORT" value="3389" />
2020
<env name="REDIS_HOST" value="localhost" />
21+
<env name="MESSENGER_REDIS_DSN" value="redis://localhost/messages" />
2122
<env name="MEMCACHED_HOST" value="localhost" />
2223
<env name="MONGODB_HOST" value="localhost" />
2324
<env name="ZOOKEEPER_HOST" value="localhost" />

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+21-2Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public function testGetAfterReject()
220220
{
221221
$redis = new \Redis();
222222
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
223+
$connection->cleanup();
223224

224225
$connection->add('1', []);
225226
$connection->add('2', []);
@@ -230,20 +231,38 @@ public function testGetAfterReject()
230231
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
231232
$this->assertNotNull($connection->get());
232233

233-
$redis->del('messenger-rejectthenget');
234+
$connection->cleanup();
234235
}
235236

236237
public function testGetNonBlocking()
237238
{
238239
$redis = new \Redis();
239240

240241
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
242+
$connection->cleanup();
241243

242244
$this->assertNull($connection->get()); // no message, should return null immediately
243245
$connection->add('1', []);
244246
$this->assertNotEmpty($message = $connection->get());
245247
$connection->reject($message['id']);
246-
$redis->del('messenger-getnonblocking');
248+
249+
$connection->cleanup();
250+
}
251+
252+
public function testGetDelayed()
253+
{
254+
$redis = new \Redis();
255+
256+
$connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis);
257+
$connection->cleanup();
258+
259+
$connection->add('1', [], 100);
260+
$this->assertNull($connection->get());
261+
usleep(300000);
262+
$this->assertNotEmpty($message = $connection->get());
263+
$connection->reject($message['id']);
264+
265+
$connection->cleanup();
247266
}
248267

249268
public function testJsonError()

‎src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
+52-30Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -141,33 +141,29 @@ public function get(): ?array
141141
if ($this->autoSetup) {
142142
$this->setup();
143143
}
144+
$now = microtime();
145+
$now = substr($now, 11).substr($now, 2, 3);
144146

145-
try {
146-
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
147-
} catch (\RedisException $e) {
148-
throw new TransportException($e->getMessage(), 0, $e);
149-
}
147+
$queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now);
150148

151-
if ($queuedMessageCount) {
152-
for ($i = 0; $i < $queuedMessageCount; ++$i) {
153-
try {
154-
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
155-
} catch (\RedisException $e) {
156-
throw new TransportException($e->getMessage(), 0, $e);
157-
}
149+
while ($queuedMessageCount--) {
150+
if (![$queuedMessage, $expiry] = $this->rawCommand('ZPOPMIN', 1)) {
151+
break;
152+
}
158153

159-
foreach ($queuedMessages as $queuedMessage => $time) {
160-
$queuedMessage = json_decode($queuedMessage, true);
161-
// if a futured placed message is actually popped because of a race condition with
162-
// another running message consumer, the message is readded to the queue by add function
163-
// else its just added stream and will be available for all stream consumers
164-
$this->add(
165-
$queuedMessage['body'],
166-
$queuedMessage['headers'],
167-
$time - $this->getCurrentTimeInMilliseconds()
168-
);
154+
if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
155+
if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
156+
throw new TransportException('Could not add a message to the redis stream.');
169157
}
158+
159+
break;
170160
}
161+
162+
$queuedMessage = json_decode($queuedMessage, true);
163+
// if a futured placed message is actually popped because of a race condition with
164+
// another running message consumer, the message is readded to the queue by add function
165+
// else its just added stream and will be available for all stream consumers
166+
$this->add($queuedMessage['body'], $queuedMessage['headers'], 0);
171167
}
172168

173169
$messageId = '>'; // will receive new messages
@@ -255,7 +251,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255251
}
256252

257253
try {
258-
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
254+
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
259255
$message = json_encode([
260256
'body' => $body,
261257
'headers' => $headers,
@@ -267,8 +263,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267263
throw new TransportException(json_last_error_msg());
268264
}
269265

270-
$score = $this->getCurrentTimeInMilliseconds() + $delayInMs;
271-
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
266+
$now = explode(' ', microtime(), 2);
267+
$now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
268+
if (3 < \strlen($now[0])) {
269+
$now[1] += substr($now[0], 0, -3);
270+
$now[0] = substr($now[0], -3);
271+
272+
if (\is_float($now[1])) {
273+
throw new TransportException("Message delay is too big: {$delayInMs}ms.");
274+
}
275+
}
276+
277+
$added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
272278
} else {
273279
$message = json_encode([
274280
'body' => $body,
@@ -316,14 +322,30 @@ public function setup(): void
316322
$this->autoSetup = false;
317323
}
318324

319-
private function getCurrentTimeInMilliseconds(): int
320-
{
321-
return (int) (microtime(true) * 1000);
322-
}
323-
324325
public function cleanup(): void
325326
{
326327
$this->connection->del($this->stream);
327328
$this->connection->del($this->queue);
328329
}
330+
331+
/**
332+
* @return mixed
333+
*/
334+
private function rawCommand(string $command, ...$arguments)
335+
{
336+
try {
337+
$result = $this->connection->rawCommand($command, $this->queue, ...$arguments);
338+
} catch (\RedisException $e) {
339+
throw new TransportException($e->getMessage(), 0, $e);
340+
}
341+
342+
if (false === $result) {
343+
if ($error = $this->connection->getLastError() ?: null) {
344+
$this->connection->clearLastError();
345+
}
346+
throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
347+
}
348+
349+
return $result;
350+
}
329351
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.