Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d211904

Browse filesBrowse files
committed
[Messenger] prevent infinite redelivery loops and blocked queues
by republishing the redelivered messages as retries with a retry limit and potential delay
1 parent b9f6944 commit d211904
Copy full SHA for d211904

File tree

Expand file treeCollapse file tree

8 files changed

+91
-7
lines changed
Filter options
Expand file treeCollapse file tree

8 files changed

+91
-7
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16491649
$defaultMiddleware = [
16501650
'before' => [
16511651
['id' => 'add_bus_name_stamp_middleware'],
1652+
['id' => 'reject_redelivered_message_middleware'],
16521653
['id' => 'dispatch_after_current_bus'],
16531654
['id' => 'failed_message_processing_middleware'],
16541655
],

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
<argument type="service" id="validator" />
4949
</service>
5050

51+
<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />
52+
5153
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
5254

5355
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,7 @@ public function testMessengerWithMultipleBuses()
739739
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
740740
$this->assertEquals([
741741
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
742+
['id' => 'reject_redelivered_message_middleware'],
742743
['id' => 'dispatch_after_current_bus'],
743744
['id' => 'failed_message_processing_middleware'],
744745
['id' => 'send_message'],
@@ -748,6 +749,7 @@ public function testMessengerWithMultipleBuses()
748749
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
749750
$this->assertEquals([
750751
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
752+
['id' => 'reject_redelivered_message_middleware'],
751753
['id' => 'dispatch_after_current_bus'],
752754
['id' => 'failed_message_processing_middleware'],
753755
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],

‎src/Symfony/Bundle/FrameworkBundle/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/composer.json
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
"symfony/expression-language": "~3.4|~4.0",
4343
"symfony/http-client": "^4.3",
4444
"symfony/mailer": "^4.3",
45-
"symfony/messenger": "^4.3",
45+
"symfony/messenger": "^4.3.6",
4646
"symfony/mime": "^4.3",
4747
"symfony/process": "~3.4|~4.0",
4848
"symfony/security-csrf": "~3.4|~4.0",
@@ -73,7 +73,7 @@
7373
"symfony/dotenv": "<4.2",
7474
"symfony/dom-crawler": "<4.3",
7575
"symfony/form": "<4.3",
76-
"symfony/messenger": "<4.3",
76+
"symfony/messenger": "<4.3.6",
7777
"symfony/property-info": "<3.4",
7878
"symfony/serializer": "<4.2",
7979
"symfony/stopwatch": "<3.4",
+21Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* @author Tobias Schultze <http://tobion.de>
16+
*
17+
* @experimental in 4.3
18+
*/
19+
class RejectRedeliveredMessageException extends RuntimeException
20+
{
21+
}
+50Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
16+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
18+
19+
/**
20+
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
21+
*
22+
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
23+
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
24+
*
25+
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
26+
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
27+
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
28+
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
29+
* limit and potential delay.
30+
*
31+
* @experimental in 4.3
32+
*
33+
* @author Tobias Schultze <http://tobion.de>
34+
*/
35+
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
36+
{
37+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
38+
{
39+
// ignore the dispatched messages for retry
40+
if (null !== $envelope->last(ReceivedStamp::class)) {
41+
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
42+
43+
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
44+
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
45+
}
46+
}
47+
48+
return $stack->next()->handle($envelope, $stack);
49+
}
50+
}

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ public function testDispatchCausesRetry()
118118
}
119119
});
120120

121-
// old message acknowledged
122-
$this->assertSame(1, $receiver->getAcknowledgeCount());
121+
// old message rejected
122+
$this->assertSame(1, $receiver->getRejectCount());
123123
}
124124

125125
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+11-3Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
1919
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
2020
use Symfony\Component\Messenger\Exception\HandlerFailedException;
21+
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2122
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
2223
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2324
use Symfony\Component\Messenger\Stamp\DelayStamp;
@@ -135,6 +136,13 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
135136
try {
136137
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
137138
} catch (\Throwable $throwable) {
139+
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
140+
if ($rejectFirst) {
141+
// redelivered messages are rejected first so that continuous failures in an event listener or while
142+
// publishing for retry does not cause infinite redelivery loops
143+
$receiver->reject($envelope);
144+
}
145+
138146
if ($throwable instanceof HandlerFailedException) {
139147
$envelope = $throwable->getEnvelope();
140148
}
@@ -156,15 +164,15 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
156164
->with(new RedeliveryStamp($retryCount, $transportName))
157165
->withoutAll(ReceivedStamp::class);
158166

159-
// re-send the message
167+
// re-send the message for retry
160168
$this->bus->dispatch($retryEnvelope);
161-
// acknowledge the previous message has received
162-
$receiver->ack($envelope);
163169
} else {
164170
if (null !== $this->logger) {
165171
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
166172
}
173+
}
167174

175+
if (!$rejectFirst) {
168176
$receiver->reject($envelope);
169177
}
170178

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.