File tree 4 files changed +86
-2
lines changed
Filter options
src/Symfony/Component/Messenger/Bridge/Redis
4 files changed +86
-2
lines changed
Original file line number Diff line number Diff line change @@ -339,6 +339,27 @@ public function testGetAfterReject()
339
339
$ redis ->del ('messenger-rejectthenget ' );
340
340
}
341
341
342
+ public function testItCountMessages ()
343
+ {
344
+ $ this ->assertSame (0 , $ this ->connection ->getMessageCount ());
345
+
346
+ $ this ->connection ->add ('{"message": "Hi"} ' , ['type ' => DummyMessage::class]);
347
+ $ this ->connection ->add ('{"message": "Hi"} ' , ['type ' => DummyMessage::class]);
348
+ $ this ->connection ->add ('{"message": "Hi"} ' , ['type ' => DummyMessage::class]);
349
+
350
+ $ this ->assertSame (3 , $ this ->connection ->getMessageCount ());
351
+
352
+ $ message = $ this ->connection ->get ();
353
+ $ this ->connection ->ack ($ message ['id ' ]);
354
+
355
+ $ this ->assertSame (2 , $ this ->connection ->getMessageCount ());
356
+
357
+ $ message = $ this ->connection ->get ();
358
+ $ this ->connection ->reject ($ message ['id ' ]);
359
+
360
+ $ this ->assertSame (1 , $ this ->connection ->getMessageCount ());
361
+ }
362
+
342
363
private function getConnectionGroup (Connection $ connection ): string
343
364
{
344
365
$ property = (new \ReflectionClass (Connection::class))->getProperty ('group ' );
Original file line number Diff line number Diff line change @@ -521,6 +521,51 @@ public function cleanup(): void
521
521
}
522
522
}
523
523
524
+ public function getMessageCount (): int
525
+ {
526
+ $ groups = $ this ->connection ->xinfo ('GROUPS ' , $ this ->stream ) ?: [];
527
+
528
+ $ lastDeliveredId = null ;
529
+ foreach ($ groups as $ group ) {
530
+ if ($ group ['name ' ] !== $ this ->group ) {
531
+ continue ;
532
+ }
533
+
534
+ // Use "lag" key provided by Redis 7.x. See https://redis.io/commands/xinfo-groups/#consumer-group-lag.
535
+ if (isset ($ group ['lag ' ])) {
536
+ return $ group ['lag ' ];
537
+ }
538
+
539
+ if (!isset ($ group ['last-delivered-id ' ])) {
540
+ return 0 ;
541
+ }
542
+
543
+ $ lastDeliveredId = $ group ['last-delivered-id ' ];
544
+ break ;
545
+ }
546
+
547
+ if (null === $ lastDeliveredId ) {
548
+ return 0 ;
549
+ }
550
+
551
+ // Iterate through the stream. See https://redis.io/commands/xrange/#iterating-a-stream.
552
+ $ useExclusiveRangeInterval = version_compare (phpversion ('redis ' ), '6.2.0 ' , '>= ' );
553
+ $ total = 0 ;
554
+ do {
555
+ if (!$ range = $ this ->connection ->xRange ($ this ->stream , $ lastDeliveredId , '+ ' , 100 )) {
556
+ return $ total ;
557
+ }
558
+
559
+ $ total += \count ($ range );
560
+
561
+ if ($ useExclusiveRangeInterval ) {
562
+ $ lastDeliveredId = preg_replace_callback ('#\d+$# ' , static fn (array $ matches ) => (int ) $ matches [0 ] + 1 , array_key_last ($ range ));
563
+ } else {
564
+ $ lastDeliveredId = '( ' .array_key_last ($ range );
565
+ }
566
+ } while (true );
567
+ }
568
+
524
569
private function rawCommand (string $ command , ...$ arguments ): mixed
525
570
{
526
571
try {
Original file line number Diff line number Diff line change 14
14
use Symfony \Component \Messenger \Envelope ;
15
15
use Symfony \Component \Messenger \Exception \LogicException ;
16
16
use Symfony \Component \Messenger \Exception \MessageDecodingFailedException ;
17
+ use Symfony \Component \Messenger \Transport \Receiver \MessageCountAwareInterface ;
17
18
use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
18
19
use Symfony \Component \Messenger \Transport \Serialization \PhpSerializer ;
19
20
use Symfony \Component \Messenger \Transport \Serialization \SerializerInterface ;
22
23
* @author Alexander Schranz <alexander@sulu.io>
23
24
* @author Antoine Bluchet <soyuka@gmail.com>
24
25
*/
25
- class RedisReceiver implements ReceiverInterface
26
+ class RedisReceiver implements ReceiverInterface, MessageCountAwareInterface
26
27
{
27
28
private Connection $ connection ;
28
29
private SerializerInterface $ serializer ;
@@ -84,6 +85,14 @@ public function reject(Envelope $envelope): void
84
85
$ this ->connection ->reject ($ this ->findRedisReceivedStamp ($ envelope )->getId ());
85
86
}
86
87
88
+ /**
89
+ * {@inheritdoc}
90
+ */
91
+ public function getMessageCount (): int
92
+ {
93
+ return $ this ->connection ->getMessageCount ();
94
+ }
95
+
87
96
private function findRedisReceivedStamp (Envelope $ envelope ): RedisReceivedStamp
88
97
{
89
98
/** @var RedisReceivedStamp|null $redisReceivedStamp */
Original file line number Diff line number Diff line change 12
12
namespace Symfony \Component \Messenger \Bridge \Redis \Transport ;
13
13
14
14
use Symfony \Component \Messenger \Envelope ;
15
+ use Symfony \Component \Messenger \Transport \Receiver \MessageCountAwareInterface ;
15
16
use Symfony \Component \Messenger \Transport \Serialization \PhpSerializer ;
16
17
use Symfony \Component \Messenger \Transport \Serialization \SerializerInterface ;
17
18
use Symfony \Component \Messenger \Transport \SetupableTransportInterface ;
21
22
* @author Alexander Schranz <alexander@sulu.io>
22
23
* @author Antoine Bluchet <soyuka@gmail.com>
23
24
*/
24
- class RedisTransport implements TransportInterface, SetupableTransportInterface
25
+ class RedisTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
25
26
{
26
27
private SerializerInterface $ serializer ;
27
28
private Connection $ connection ;
@@ -74,6 +75,14 @@ public function setup(): void
74
75
$ this ->connection ->setup ();
75
76
}
76
77
78
+ /**
79
+ * {@inheritdoc}
80
+ */
81
+ public function getMessageCount (): int
82
+ {
83
+ return $ this ->getReceiver ()->getMessageCount ();
84
+ }
85
+
77
86
private function getReceiver (): RedisReceiver
78
87
{
79
88
return $ this ->receiver ??= new RedisReceiver ($ this ->connection , $ this ->serializer );
You can’t perform that action at this time.
0 commit comments