Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9cb6fdf

Browse filesBrowse files
committed
Implemted receiving of old pending messages
1 parent 33e2735 commit 9cb6fdf
Copy full SHA for 9cb6fdf

File tree

3 files changed

+207
-12
lines changed
Filter options

3 files changed

+207
-12
lines changed

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php
+56-12Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public function testKeepGettingPendingMessages()
123123

124124
$redis->expects($this->exactly(3))->method('xreadgroup')
125125
->with('symfony', 'consumer', ['queue' => 0], 1, null)
126-
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
126+
->willReturn(['queue' => [['message' => '{"body":"Test","headers":[]}']]]);
127127

128128
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
129129
$this->assertNotNull($connection->get());
@@ -164,23 +164,67 @@ public function testDbIndex()
164164
$this->assertSame(2, $redis->getDbNum());
165165
}
166166

167-
public function testFirstGetPendingMessagesThenNewMessages()
167+
public function testGetPendingMessageFirst()
168168
{
169169
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
170170

171-
$count = 0;
171+
$redis->expects($this->exactly(1))->method('xreadgroup')
172+
->with('symfony', 'consumer', ['queue' => '0'], 1, null)
173+
->willReturn(['queue' => [['message' => '{"body":"1","headers":[]}']]]);
172174

173-
$redis->expects($this->exactly(2))->method('xreadgroup')
174-
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
175-
++$count;
175+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
176+
$connection->get();
177+
}
178+
179+
public function testClaimAbandonedMessageWithRaceCondition()
180+
{
181+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
182+
183+
$redis->expects($this->exactly(3))->method('xreadgroup')
184+
->withConsecutive(
185+
['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages
186+
['symfony', 'consumer', ['queue' => '0'], 1, null], // sencond call because of claimed message (redisid-123)
187+
['symfony', 'consumer', ['queue' => '>'], 1, null] // third call because of no result (other consumer claimed message redisid-123)
188+
)
189+
->willReturnOnConsecutiveCalls([], [], []);
190+
191+
$redis->expects($this->once())->method('xpending')->willReturn([[
192+
0 => 'redisid-123', // message-id
193+
1 => 'consumer-2', // consumer-name
194+
2 => 3600001, // idle
195+
]]);
196+
197+
$redis->expects($this->exactly(1))->method('xclaim')
198+
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
199+
->willReturn([]);
200+
201+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
202+
$connection->get();
203+
}
176204

177-
if (1 === $count) {
178-
return '0' === $arr_streams['queue'];
179-
}
205+
public function testClaimAbandonedMessage()
206+
{
207+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
180208

181-
return '>' === $arr_streams['queue'];
182-
}), 1, null)
183-
->willReturn(['queue' => []]);
209+
$redis->expects($this->exactly(2))->method('xreadgroup')
210+
->withConsecutive(
211+
['symfony', 'consumer', ['queue' => '0'], 1, null], // first call for pending messages
212+
['symfony', 'consumer', ['queue' => '0'], 1, null] // sencond call because of claimed message (redisid-123)
213+
)
214+
->willReturnOnConsecutiveCalls(
215+
[], // first call returns no result
216+
['queue' => [['message' => '{"body":"1","headers":[]}']]] // second call returns clamed message (redisid-123)
217+
);
218+
219+
$redis->expects($this->once())->method('xpending')->willReturn([[
220+
0 => 'redisid-123', // message-id
221+
1 => 'consumer-2', // consumer-name
222+
2 => 3600001, // idle
223+
]]);
224+
225+
$redis->expects($this->exactly(1))->method('xclaim')
226+
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
227+
->willReturn([]);
184228

185229
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
186230
$connection->get();

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php
+83Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,87 @@ public function testConnectionSendDelayedMessagesWithSameContent()
100100
$this->assertEquals($body, $encoded['body']);
101101
$this->assertEquals($headers, $encoded['headers']);
102102
}
103+
104+
public function testConnectionBelowRedeliverTimeout()
105+
{
106+
// lower redeliver timeout and claim interval
107+
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
108+
109+
$connection->cleanup();
110+
$connection->setup();
111+
112+
$body = '{"message": "Hi"}';
113+
$headers = ['type' => DummyMessage::class];
114+
115+
// Add two messages
116+
$connection->add($body, $headers);
117+
118+
// Read first message with other consumer
119+
$this->redis->xreadgroup(
120+
$this->getConnectionGroup($connection),
121+
'other-consumer2',
122+
[$this->getConnectionStream($connection) => '>'],
123+
1
124+
);
125+
126+
// Queue will not have any messages yet
127+
$this->assertNull($connection->get());
128+
}
129+
130+
public function testConnectionClaimAndRedeliver()
131+
{
132+
// lower redeliver timeout and claim interval
133+
$connection = Connection::fromDsn(
134+
getenv('MESSENGER_REDIS_DSN'),
135+
['redeliver_timeout' => 0, 'claim_interval' => 500],
136+
$this->redis
137+
);
138+
139+
$connection->cleanup();
140+
$connection->setup();
141+
142+
$body1 = '{"message": "Hi"}';
143+
$body2 = '{"message": "Bye"}';
144+
$headers = ['type' => DummyMessage::class];
145+
146+
// Add two messages
147+
$connection->add($body1, $headers);
148+
$connection->add($body2, $headers);
149+
150+
// Read first message with other consumer
151+
$this->redis->xreadgroup(
152+
$this->getConnectionGroup($connection),
153+
'other-consumer2',
154+
[$this->getConnectionStream($connection) => '>'],
155+
1
156+
);
157+
158+
// Queue will return the pending message first because redeliver_timeout = 0
159+
$encoded = $connection->get();
160+
$this->assertEquals($body1, $encoded['body']);
161+
$this->assertEquals($headers, $encoded['headers']);
162+
$connection->ack($encoded['id']);
163+
164+
// Queue will return the second message
165+
$encoded = $connection->get();
166+
$this->assertEquals($body2, $encoded['body']);
167+
$this->assertEquals($headers, $encoded['headers']);
168+
$connection->ack($encoded['id']);
169+
}
170+
171+
private function getConnectionGroup(Connection $connection): string
172+
{
173+
$property = (new \ReflectionClass(Connection::class))->getProperty('group');
174+
$property->setAccessible(true);
175+
176+
return $property->getValue($connection);
177+
}
178+
179+
private function getConnectionStream(Connection $connection): string
180+
{
181+
$property = (new \ReflectionClass(Connection::class))->getProperty('stream');
182+
$property->setAccessible(true);
183+
184+
return $property->getValue($connection);
185+
}
103186
}

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php
+68Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class Connection
3535
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
3636
'dbindex' => 0,
3737
'tls' => false,
38+
'redeliver_timeout' => 3600, // Timeout before redeliver messages still in pending state (seconds)
39+
'claim_interval' => 60000, // Interval by which pending/abandoned messages should be checked
3840
];
3941

4042
private $connection;
@@ -44,6 +46,9 @@ class Connection
4446
private $consumer;
4547
private $autoSetup;
4648
private $maxEntries;
49+
private $redeliverTimeout;
50+
private $nextClaim = 0;
51+
private $claimInterval;
4752
private $couldHavePendingMessages = true;
4853

4954
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@@ -70,6 +75,8 @@ public function __construct(array $configuration, array $connectionCredentials =
7075
$this->queue = $this->stream.'__queue';
7176
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
7277
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
78+
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
79+
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
7380
}
7481

7582
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@@ -113,13 +120,27 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
113120
unset($redisOptions['tls']);
114121
}
115122

123+
$redeliverTimeout = null;
124+
if (\array_key_exists('redeliver_timeout', $redisOptions)) {
125+
$redeliverTimeout = filter_var($redisOptions['redeliver_timeout'], FILTER_VALIDATE_INT);
126+
unset($redisOptions['redeliver_timeout']);
127+
}
128+
129+
$claimInterval = null;
130+
if (\array_key_exists('claim_interval', $redisOptions)) {
131+
$claimInterval = filter_var($redisOptions['claim_interval'], FILTER_VALIDATE_INT);
132+
unset($redisOptions['claim_interval']);
133+
}
134+
116135
$configuration = [
117136
'stream' => $redisOptions['stream'] ?? null,
118137
'group' => $redisOptions['group'] ?? null,
119138
'consumer' => $redisOptions['consumer'] ?? null,
120139
'auto_setup' => $autoSetup,
121140
'stream_max_entries' => $maxEntries,
122141
'dbindex' => $dbIndex,
142+
'redeliver_timeout' => $redeliverTimeout,
143+
'claim_interval' => $claimInterval,
123144
];
124145

125146
if (isset($parsedUrl['host'])) {
@@ -157,6 +178,49 @@ private static function validateOptions(array $options): void
157178
}
158179
}
159180

181+
private function claimOldPendingMessages()
182+
{
183+
try {
184+
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
185+
// https://github.com/antirez/redis/issues/6256
186+
$pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 1);
187+
} catch (\RedisException $e) {
188+
throw new TransportException($e->getMessage(), 0, $e);
189+
}
190+
191+
$claimableIds = [];
192+
foreach ($pendingMessages as $pendingMessage) {
193+
if ($pendingMessage[1] === $this->consumer) {
194+
$this->couldHavePendingMessages = true;
195+
196+
return;
197+
}
198+
199+
if ($pendingMessage[2] >= $this->redeliverTimeout) {
200+
$claimableIds[] = $pendingMessage[0];
201+
}
202+
}
203+
204+
if (\count($claimableIds) > 0) {
205+
try {
206+
$this->connection->xclaim(
207+
$this->stream,
208+
$this->group,
209+
$this->consumer,
210+
$this->redeliverTimeout,
211+
$claimableIds,
212+
['JUSTID']
213+
);
214+
215+
$this->couldHavePendingMessages = true;
216+
} catch (\RedisException $e) {
217+
throw new TransportException($e->getMessage(), 0, $e);
218+
}
219+
}
220+
221+
$this->nextClaim = $this->getCurrentTimeInMilliseconds() + $this->claimInterval;
222+
}
223+
160224
public function get(): ?array
161225
{
162226
if ($this->autoSetup) {
@@ -191,6 +255,10 @@ public function get(): ?array
191255
}
192256
}
193257

258+
if (!$this->couldHavePendingMessages && $this->nextClaim <= $this->getCurrentTimeInMilliseconds()) {
259+
$this->claimOldPendingMessages();
260+
}
261+
194262
$messageId = '>'; // will receive new messages
195263

196264
if ($this->couldHavePendingMessages) {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.