diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php index 304372089e1c5..99da54f40bf85 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php @@ -339,6 +339,27 @@ public function testGetAfterReject() $redis->del('messenger-rejectthenget'); } + public function testItCountMessages() + { + $this->assertSame(0, $this->connection->getMessageCount()); + + $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]); + $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]); + $this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]); + + $this->assertSame(3, $this->connection->getMessageCount()); + + $message = $this->connection->get(); + $this->connection->ack($message['id']); + + $this->assertSame(2, $this->connection->getMessageCount()); + + $message = $this->connection->get(); + $this->connection->reject($message['id']); + + $this->assertSame(1, $this->connection->getMessageCount()); + } + private function getConnectionGroup(Connection $connection): string { $property = (new \ReflectionClass(Connection::class))->getProperty('group'); diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php index 0c4bfed315c50..f656b5f5650b4 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php @@ -521,6 +521,51 @@ public function cleanup(): void } } + public function getMessageCount(): int + { + $groups = $this->connection->xinfo('GROUPS', $this->stream) ?: []; + + $lastDeliveredId = null; + foreach ($groups as $group) { + if ($group['name'] !== $this->group) { + continue; + } + + // Use "lag" key provided by Redis 7.x. See https://redis.io/commands/xinfo-groups/#consumer-group-lag. + if (isset($group['lag'])) { + return $group['lag']; + } + + if (!isset($group['last-delivered-id'])) { + return 0; + } + + $lastDeliveredId = $group['last-delivered-id']; + break; + } + + if (null === $lastDeliveredId) { + return 0; + } + + // Iterate through the stream. See https://redis.io/commands/xrange/#iterating-a-stream. + $useExclusiveRangeInterval = version_compare(phpversion('redis'), '6.2.0', '>='); + $total = 0; + do { + if (!$range = $this->connection->xRange($this->stream, $lastDeliveredId, '+', 100)) { + return $total; + } + + $total += \count($range); + + if ($useExclusiveRangeInterval) { + $lastDeliveredId = preg_replace_callback('#\d+$#', static fn(array $matches) => (int) $matches[0] + 1, array_key_last($range)); + } else { + $lastDeliveredId = '('.array_key_last($range); + } + } while (true); + } + private function rawCommand(string $command, ...$arguments): mixed { try { diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php index a4500ab70407a..8d7ffafc63806 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php @@ -14,6 +14,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -22,7 +23,7 @@ * @author Alexander Schranz * @author Antoine Bluchet */ -class RedisReceiver implements ReceiverInterface +class RedisReceiver implements ReceiverInterface, MessageCountAwareInterface { private Connection $connection; private SerializerInterface $serializer; @@ -84,6 +85,14 @@ public function reject(Envelope $envelope): void $this->connection->reject($this->findRedisReceivedStamp($envelope)->getId()); } + /** + * {@inheritdoc} + */ + public function getMessageCount(): int + { + return $this->connection->getMessageCount(); + } + private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp { /** @var RedisReceivedStamp|null $redisReceivedStamp */ diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php index 2f235012a766d..ab64981b8d8d9 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Bridge\Redis\Transport; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\SetupableTransportInterface; @@ -21,7 +22,7 @@ * @author Alexander Schranz * @author Antoine Bluchet */ -class RedisTransport implements TransportInterface, SetupableTransportInterface +class RedisTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface { private SerializerInterface $serializer; private Connection $connection; @@ -74,6 +75,14 @@ public function setup(): void $this->connection->setup(); } + /** + * {@inheritdoc} + */ + public function getMessageCount(): int + { + return $this->getReceiver()->getMessageCount(); + } + private function getReceiver(): RedisReceiver { return $this->receiver ??= new RedisReceiver($this->connection, $this->serializer);