Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 705b6f8

Browse filesBrowse files
author
Robin Chalas
committed
[Messenger] Make redis Connection::get() non blocking by default
1 parent e606ac1 commit 705b6f8
Copy full SHA for 705b6f8

File tree

4 files changed

+60
-3
lines changed
Filter options

4 files changed

+60
-3
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+20Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public function testKeepGettingPendingMessages()
6969
{
7070
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
7171

72+
$redis->expects($this->atLeastOnce())->method('xlen')->willReturn(1);
7273
$redis->expects($this->exactly(3))->method('xreadgroup')
7374
->with('symfony', 'consumer', ['queue' => 0], 1, null)
7475
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
@@ -85,6 +86,7 @@ public function testFirstGetPendingMessagesThenNewMessages()
8586

8687
$count = 0;
8788

89+
$redis->expects($this->atLeastOnce())->method('xlen')->willReturn(1);
8890
$redis->expects($this->exactly(2))->method('xreadgroup')
8991
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
9092
++$count;
@@ -106,6 +108,7 @@ public function testUnexpectedRedisError()
106108
$this->expectException(TransportException::class);
107109
$this->expectExceptionMessage('Redis error happens');
108110
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
111+
$redis->expects($this->once())->method('xlen')->willReturn(1);
109112
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
110113
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
111114

@@ -130,4 +133,21 @@ public function testGetAfterReject()
130133
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
131134
$this->assertNotNull($connection->get());
132135
}
136+
137+
public function testGetNonBlocking()
138+
{
139+
$redis = new \Redis();
140+
141+
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
142+
try {
143+
$connection->setup();
144+
} catch (TransportException $e) {
145+
}
146+
147+
$this->assertNull($connection->get());
148+
$connection->add('1', []);
149+
$this->assertNotEmpty($message = $connection->get());
150+
$connection->reject($message['id']);
151+
$redis->del('messenger-getnonblocking');
152+
}
133153
}

‎src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
+20-1Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
7171

7272
public function get(): ?array
7373
{
74+
if (null === $this->blockingTimeout && !$this->len()) {
75+
return null;
76+
}
77+
7478
$messageId = '>'; // will receive new messages
7579

7680
if ($this->couldHavePendingMessages) {
@@ -142,7 +146,7 @@ public function reject(string $id): void
142146
}
143147
}
144148

145-
public function add(string $body, array $headers)
149+
public function add(string $body, array $headers): void
146150
{
147151
$e = null;
148152
try {
@@ -157,6 +161,21 @@ public function add(string $body, array $headers)
157161
}
158162
}
159163

164+
public function len(): int
165+
{
166+
$e = null;
167+
try {
168+
$length = $this->connection->xLen($this->stream);
169+
} catch (\RedisException $e) {
170+
}
171+
172+
if ($e || false === $length || null === $length) {
173+
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not get the length of the redis stream.', 0, $e);
174+
}
175+
176+
return $length;
177+
}
178+
160179
public function setup(): void
161180
{
162181
try {

‎src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php
+10-1Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -24,7 +25,7 @@
2425
*
2526
* @experimental in 4.3
2627
*/
27-
class RedisReceiver implements ReceiverInterface
28+
class RedisReceiver implements ReceiverInterface, MessageCountAwareInterface
2829
{
2930
private $connection;
3031
private $serializer;
@@ -76,6 +77,14 @@ public function reject(Envelope $envelope): void
7677
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
7778
}
7879

80+
/**
81+
* {@inheritdoc}
82+
*/
83+
public function getMessageCount(): int
84+
{
85+
return $this->connection->len();
86+
}
87+
7988
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
8089
{
8190
/** @var RedisReceivedStamp|null $redisReceivedStamp */

‎src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/RedisTransport.php
+10-1Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Transport\RedisExt;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1516
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1617
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1718
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
@@ -23,7 +24,7 @@
2324
*
2425
* @experimental in 4.3
2526
*/
26-
class RedisTransport implements TransportInterface, SetupableTransportInterface
27+
class RedisTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2728
{
2829
private $serializer;
2930
private $connection;
@@ -68,6 +69,14 @@ public function send(Envelope $envelope): Envelope
6869
return ($this->sender ?? $this->getSender())->send($envelope);
6970
}
7071

72+
/**
73+
* {@inheritdoc}
74+
*/
75+
public function getMessageCount(): int
76+
{
77+
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
78+
}
79+
7180
/**
7281
* {@inheritdoc}
7382
*/

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.