Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2949e9c

Browse filesBrowse files
committed
feature #46229 [Messenger] Make Redis messages countable (Jean-Beru)
This PR was squashed before being merged into the 6.2 branch. Discussion ---------- [Messenger] Make Redis messages countable | Q | A | ------------- | --- | Branch? | 6.1 | Bug fix? | no | New feature? | no | Deprecations? | no | Tickets | | License | MIT | Doc PR | Each Messenger Transport can return the number of messages present in the queue. For example, when we use the `messenger:failed:retry` command, Messenger displays the number of messages to retry. Actually, this count is not displayed when using `RedisTransport`. This PR adds this functionality. Commits ------- e1a896f [Messenger] Make Redis messages countable
2 parents 86f78aa + e1a896f commit 2949e9c
Copy full SHA for 2949e9c

File tree

4 files changed

+86
-2
lines changed
Filter options

4 files changed

+86
-2
lines changed

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php
+21Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,27 @@ public function testGetAfterReject()
339339
$redis->del('messenger-rejectthenget');
340340
}
341341

342+
public function testItCountMessages()
343+
{
344+
$this->assertSame(0, $this->connection->getMessageCount());
345+
346+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
347+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
348+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
349+
350+
$this->assertSame(3, $this->connection->getMessageCount());
351+
352+
$message = $this->connection->get();
353+
$this->connection->ack($message['id']);
354+
355+
$this->assertSame(2, $this->connection->getMessageCount());
356+
357+
$message = $this->connection->get();
358+
$this->connection->reject($message['id']);
359+
360+
$this->assertSame(1, $this->connection->getMessageCount());
361+
}
362+
342363
private function getConnectionGroup(Connection $connection): string
343364
{
344365
$property = (new \ReflectionClass(Connection::class))->getProperty('group');

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php
+45Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,51 @@ public function cleanup(): void
521521
}
522522
}
523523

524+
public function getMessageCount(): int
525+
{
526+
$groups = $this->connection->xinfo('GROUPS', $this->stream) ?: [];
527+
528+
$lastDeliveredId = null;
529+
foreach ($groups as $group) {
530+
if ($group['name'] !== $this->group) {
531+
continue;
532+
}
533+
534+
// Use "lag" key provided by Redis 7.x. See https://redis.io/commands/xinfo-groups/#consumer-group-lag.
535+
if (isset($group['lag'])) {
536+
return $group['lag'];
537+
}
538+
539+
if (!isset($group['last-delivered-id'])) {
540+
return 0;
541+
}
542+
543+
$lastDeliveredId = $group['last-delivered-id'];
544+
break;
545+
}
546+
547+
if (null === $lastDeliveredId) {
548+
return 0;
549+
}
550+
551+
// Iterate through the stream. See https://redis.io/commands/xrange/#iterating-a-stream.
552+
$useExclusiveRangeInterval = version_compare(phpversion('redis'), '6.2.0', '>=');
553+
$total = 0;
554+
do {
555+
if (!$range = $this->connection->xRange($this->stream, $lastDeliveredId, '+', 100)) {
556+
return $total;
557+
}
558+
559+
$total += \count($range);
560+
561+
if ($useExclusiveRangeInterval) {
562+
$lastDeliveredId = preg_replace_callback('#\d+$#', static fn(array $matches) => (int) $matches[0] + 1, array_key_last($range));
563+
} else {
564+
$lastDeliveredId = '('.array_key_last($range);
565+
}
566+
} while (true);
567+
}
568+
524569
private function rawCommand(string $command, ...$arguments): mixed
525570
{
526571
try {

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php
+10-1Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -22,7 +23,7 @@
2223
* @author Alexander Schranz <alexander@sulu.io>
2324
* @author Antoine Bluchet <soyuka@gmail.com>
2425
*/
25-
class RedisReceiver implements ReceiverInterface
26+
class RedisReceiver implements ReceiverInterface, MessageCountAwareInterface
2627
{
2728
private Connection $connection;
2829
private SerializerInterface $serializer;
@@ -84,6 +85,14 @@ public function reject(Envelope $envelope): void
8485
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
8586
}
8687

88+
/**
89+
* {@inheritdoc}
90+
*/
91+
public function getMessageCount(): int
92+
{
93+
return $this->connection->getMessageCount();
94+
}
95+
8796
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
8897
{
8998
/** @var RedisReceivedStamp|null $redisReceivedStamp */

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php
+10-1Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1516
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1617
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1718
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
@@ -21,7 +22,7 @@
2122
* @author Alexander Schranz <alexander@sulu.io>
2223
* @author Antoine Bluchet <soyuka@gmail.com>
2324
*/
24-
class RedisTransport implements TransportInterface, SetupableTransportInterface
25+
class RedisTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2526
{
2627
private SerializerInterface $serializer;
2728
private Connection $connection;
@@ -74,6 +75,14 @@ public function setup(): void
7475
$this->connection->setup();
7576
}
7677

78+
/**
79+
* {@inheritdoc}
80+
*/
81+
public function getMessageCount(): int
82+
{
83+
return $this->getReceiver()->getMessageCount();
84+
}
85+
7786
private function getReceiver(): RedisReceiver
7887
{
7988
return $this->receiver ??= new RedisReceiver($this->connection, $this->serializer);

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.