Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 652c020

Browse filesBrowse files
[Messenger] fix Redis support on 32b arch
1 parent 6d688f6 commit 652c020
Copy full SHA for 652c020

File tree

3 files changed

+59
-14
lines changed
Filter options

3 files changed

+59
-14
lines changed

‎.appveyor.yml

Copy file name to clipboardExpand all lines: .appveyor.yml
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ install:
2121
- cd ext
2222
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_apcu-5.1.18-7.1-ts-vc14-x86.zip
2323
- 7z x php_apcu-5.1.18-7.1-ts-vc14-x86.zip -y >nul
24+
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_redis-5.1.1-7.1-ts-vc14-x86.zip
25+
- 7z x php_redis-5.1.1-7.1-ts-vc14-x86.zip -y >nul
2426
- cd ..
2527
- copy /Y php.ini-development php.ini-min
2628
- echo memory_limit=-1 >> php.ini-min
@@ -36,6 +38,7 @@ install:
3638
- echo opcache.enable_cli=1 >> php.ini-max
3739
- echo extension=php_openssl.dll >> php.ini-max
3840
- echo extension=php_apcu.dll >> php.ini-max
41+
- echo extension=php_redis.dll >> php.ini-max
3942
- echo apc.enable_cli=1 >> php.ini-max
4043
- echo extension=php_intl.dll >> php.ini-max
4144
- echo extension=php_mbstring.dll >> php.ini-max
@@ -54,6 +57,7 @@ install:
5457
- SET COMPOSER_ROOT_VERSION=%SYMFONY_VERSION%.x-dev
5558
- php composer.phar update --no-progress --ansi
5659
- php phpunit install
60+
- choco install memurai-developer
5761

5862
test_script:
5963
- SET X=0

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+21-2Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public function testGetAfterReject()
220220
{
221221
$redis = new \Redis();
222222
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
223+
$connection->cleanup();
223224

224225
$connection->add('1', []);
225226
$connection->add('2', []);
@@ -230,20 +231,38 @@ public function testGetAfterReject()
230231
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
231232
$this->assertNotNull($connection->get());
232233

233-
$redis->del('messenger-rejectthenget');
234+
$connection->cleanup();
234235
}
235236

236237
public function testGetNonBlocking()
237238
{
238239
$redis = new \Redis();
239240

240241
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
242+
$connection->cleanup();
241243

242244
$this->assertNull($connection->get()); // no message, should return null immediately
243245
$connection->add('1', []);
244246
$this->assertNotEmpty($message = $connection->get());
245247
$connection->reject($message['id']);
246-
$redis->del('messenger-getnonblocking');
248+
249+
$connection->cleanup();
250+
}
251+
252+
public function testGetDelayed()
253+
{
254+
$redis = new \Redis();
255+
256+
$connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis);
257+
$connection->cleanup();
258+
259+
$connection->add('1', [], 100);
260+
$this->assertNull($connection->get());
261+
usleep(300000);
262+
$this->assertNotEmpty($message = $connection->get());
263+
$connection->reject($message['id']);
264+
265+
$connection->cleanup();
247266
}
248267

249268
public function testJsonError()

‎src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
+34-12Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,30 +141,47 @@ public function get(): ?array
141141
if ($this->autoSetup) {
142142
$this->setup();
143143
}
144+
$now = microtime();
145+
$now = substr($now, 11).substr($now, 2, 3);
144146

145147
try {
146-
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
148+
$queuedMessageCount = $this->connection->rawCommand('ZCOUNT', $this->queue, 0, $now);
147149
} catch (\RedisException $e) {
148150
throw new TransportException($e->getMessage(), 0, $e);
149151
}
150152

151153
if ($queuedMessageCount) {
152154
for ($i = 0; $i < $queuedMessageCount; ++$i) {
153155
try {
154-
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
156+
$queuedMessages = $this->connection->rawCommand('ZPOPMIN', $this->queue, 1) ?: [];
155157
} catch (\RedisException $e) {
156158
throw new TransportException($e->getMessage(), 0, $e);
157159
}
158160

159-
foreach ($queuedMessages as $queuedMessage => $time) {
161+
$i = \count($queuedMessages);
162+
while (2 <= $i) {
163+
$expiry = $queuedMessages[--$i];
164+
$queuedMessage = $queuedMessages[--$i];
165+
166+
if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
167+
if (!$this->connection->rawCommand('ZADD', $this->queue, 'NX', $expiry, $queuedMessage)) {
168+
if ($error = $this->connection->getLastError() ?: null) {
169+
$this->connection->clearLastError();
170+
}
171+
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
172+
}
173+
174+
continue;
175+
}
176+
160177
$queuedMessage = json_decode($queuedMessage, true);
161178
// if a futured placed message is actually popped because of a race condition with
162179
// another running message consumer, the message is readded to the queue by add function
163180
// else its just added stream and will be available for all stream consumers
164181
$this->add(
165182
$queuedMessage['body'],
166183
$queuedMessage['headers'],
167-
$time - $this->getCurrentTimeInMilliseconds()
184+
0
168185
);
169186
}
170187
}
@@ -255,7 +272,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255272
}
256273

257274
try {
258-
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
275+
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
259276
$message = json_encode([
260277
'body' => $body,
261278
'headers' => $headers,
@@ -267,8 +284,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267284
throw new TransportException(json_last_error_msg());
268285
}
269286

270-
$score = $this->getCurrentTimeInMilliseconds() + $delayInMs;
271-
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
287+
$now = explode(' ', microtime(), 2);
288+
$now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
289+
if (3 < \strlen($now[0])) {
290+
$now[1] += substr($now[0], 0, -3);
291+
$now[0] = substr($now[0], -3);
292+
293+
if (\is_float($now[1])) {
294+
throw new TransportException("Message delay is too big: {$delayInMs}ms.");
295+
}
296+
}
297+
298+
$added = $this->connection->rawCommand('ZADD', $this->queue, 'NX', $now[1].$now[0], $message);
272299
} else {
273300
$message = json_encode([
274301
'body' => $body,
@@ -316,11 +343,6 @@ public function setup(): void
316343
$this->autoSetup = false;
317344
}
318345

319-
private function getCurrentTimeInMilliseconds(): int
320-
{
321-
return (int) (microtime(true) * 1000);
322-
}
323-
324346
public function cleanup(): void
325347
{
326348
$this->connection->del($this->stream);

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.